sui_core/
rpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use rayon::iter::IntoParallelIterator;
12use rayon::iter::ParallelIterator;
13use serde::Deserialize;
14use serde::Serialize;
15use std::collections::{BTreeMap, HashMap};
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::time::Duration;
21use std::time::Instant;
22use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
23use sui_types::base_types::MoveObjectType;
24use sui_types::base_types::ObjectID;
25use sui_types::base_types::SequenceNumber;
26use sui_types::base_types::SuiAddress;
27use sui_types::coin::Coin;
28use sui_types::committee::EpochId;
29use sui_types::digests::TransactionDigest;
30use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
31use sui_types::full_checkpoint_content::CheckpointData;
32use sui_types::layout_resolver::LayoutResolver;
33use sui_types::messages_checkpoint::CheckpointContents;
34use sui_types::messages_checkpoint::CheckpointSequenceNumber;
35use sui_types::object::Data;
36use sui_types::object::Object;
37use sui_types::object::Owner;
38use sui_types::storage::BackingPackageStore;
39use sui_types::storage::DynamicFieldKey;
40use sui_types::storage::EpochInfo;
41use sui_types::storage::TransactionInfo;
42use sui_types::storage::error::Error as StorageError;
43use sui_types::sui_system_state::SuiSystemStateTrait;
44use sui_types::transaction::{TransactionDataAPI, TransactionKind};
45use sysinfo::{MemoryRefreshKind, RefreshKind, System};
46use tracing::{debug, info, warn};
47use typed_store::DBMapUtils;
48use typed_store::TypedStoreError;
49use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
50use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
51use typed_store::traits::Map;
52
53const CURRENT_DB_VERSION: u64 = 3;
54// I tried increasing this to 100k and 1M and it didn't speed up indexing at all.
55const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
56
57fn bulk_ingestion_write_options() -> WriteOptions {
58    let mut opts = WriteOptions::default();
59    opts.disable_wal(true);
60    opts
61}
62
63/// Get available memory, respecting cgroup limits in containerized environments
64fn get_available_memory() -> u64 {
65    // RefreshKind::nothing().with_memory() avoids collecting other, slower stats
66    let mut sys = System::new_with_specifics(
67        RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
68    );
69    sys.refresh_memory();
70
71    // Check if we have cgroup limits
72    if let Some(cgroup_limits) = sys.cgroup_limits() {
73        let memory_limit = cgroup_limits.total_memory;
74        // cgroup_limits.total_memory is 0 when there's no limit
75        if memory_limit > 0 {
76            debug!("Using cgroup memory limit: {} bytes", memory_limit);
77            return memory_limit;
78        }
79    }
80
81    // Fall back to system memory if no cgroup limits found
82    // sysinfo 0.35 already reports bytes (not KiB like older versions)
83    let total_memory_bytes = sys.total_memory();
84    debug!("Using system memory: {} bytes", total_memory_bytes);
85    total_memory_bytes
86}
87
88#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
89struct MetadataInfo {
90    /// Version of the Database
91    version: u64,
92}
93
94/// Checkpoint watermark type
95#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
96pub enum Watermark {
97    Indexed,
98    Pruned,
99}
100
101#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
102pub struct OwnerIndexKey {
103    pub owner: SuiAddress,
104
105    pub object_type: StructTag,
106
107    // If this object is coin-like (eg 0x2::coin::Coin) then this will be the balance of the coin
108    // inverted `!coin.balance` in order to force sorting of coins to be from greatest to least
109    pub inverted_balance: Option<u64>,
110
111    pub object_id: ObjectID,
112}
113
114impl OwnerIndexKey {
115    // Creates a key from the provided object.
116    // Panics if the provided object is not an Address owned object
117    fn from_object(object: &Object) -> Self {
118        let owner = match object.owner() {
119            Owner::AddressOwner(owner) => owner,
120            Owner::ConsensusAddressOwner { owner, .. } => owner,
121            _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
122        };
123        let object_type = object.struct_tag().expect("packages cannot be owned");
124
125        let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
126
127        Self {
128            owner: *owner,
129            object_type,
130            inverted_balance,
131            object_id: object.id(),
132        }
133    }
134}
135
136#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub struct OwnerIndexInfo {
138    // object_id and type of this object are a part of the key
139    pub version: SequenceNumber,
140}
141
142impl OwnerIndexInfo {
143    pub fn new(object: &Object) -> Self {
144        Self {
145            version: object.version(),
146        }
147    }
148}
149
150#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
151pub struct CoinIndexKey {
152    coin_type: StructTag,
153}
154
155#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
156pub struct BalanceKey {
157    pub owner: SuiAddress,
158    pub coin_type: StructTag,
159}
160
161#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
162pub struct CoinIndexInfo {
163    pub coin_metadata_object_id: Option<ObjectID>,
164    pub treasury_object_id: Option<ObjectID>,
165    pub regulated_coin_metadata_object_id: Option<ObjectID>,
166}
167
168#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
169pub struct BalanceIndexInfo {
170    pub balance_delta: i128,
171}
172
173impl From<u64> for BalanceIndexInfo {
174    fn from(coin_value: u64) -> Self {
175        Self {
176            balance_delta: coin_value as i128,
177        }
178    }
179}
180
181impl BalanceIndexInfo {
182    fn merge_delta(&mut self, other: &Self) {
183        self.balance_delta += other.balance_delta;
184    }
185}
186
187impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
188    fn from(index_info: BalanceIndexInfo) -> Self {
189        // Note: We represent balance deltas as i128 to simplify merging positive and negative updates.
190        // Be aware: Move doesn’t enforce a one-time-witness (OTW) pattern when creating a Supply<T>.
191        // Anyone can call `sui::balance::create_supply` and mint unbounded supply, potentially pushing
192        // total balances over u64::MAX. To avoid crashing the indexer, we clamp the merged value instead
193        // of panicking on overflow. This has the unfortunate consequence of making bugs in the index
194        // harder to detect, but is a necessary trade-off to avoid creating a DOS attack vector.
195        let balance = index_info.balance_delta.clamp(0, u64::MAX as i128) as u64;
196        sui_types::storage::BalanceInfo { balance }
197    }
198}
199
200#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
201pub struct PackageVersionKey {
202    pub original_package_id: ObjectID,
203    pub version: u64,
204}
205
206#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
207pub struct PackageVersionInfo {
208    pub storage_id: ObjectID,
209}
210
211#[derive(Default, Clone)]
212pub struct IndexStoreOptions {
213    pub events_compaction_filter: Option<EventsCompactionFilter>,
214}
215
216fn default_table_options() -> typed_store::rocks::DBOptions {
217    typed_store::rocks::default_db_options().disable_write_throttling()
218}
219
220fn events_table_options(
221    compaction_filter: Option<EventsCompactionFilter>,
222) -> typed_store::rocks::DBOptions {
223    let mut options = default_table_options();
224    if let Some(filter) = compaction_filter {
225        options.options.set_compaction_filter(
226            "events_by_stream",
227            move |_, key, value| match filter.filter(key, value) {
228                Ok(decision) => decision,
229                Err(e) => {
230                    warn!(
231                        "Failed to parse event key during compaction: {}, key: {:?}",
232                        e, key
233                    );
234                    Decision::Remove
235                }
236            },
237        );
238    }
239    options
240}
241
242fn balance_delta_merge_operator(
243    _key: &[u8],
244    existing_val: Option<&[u8]>,
245    operands: &MergeOperands,
246) -> Option<Vec<u8>> {
247    let mut result = existing_val
248        .map(|v| {
249            bcs::from_bytes::<BalanceIndexInfo>(v)
250                .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.")
251        })
252        .unwrap_or_default();
253
254    for operand in operands.iter() {
255        let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
256            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.");
257        result.merge_delta(&delta);
258    }
259    Some(
260        bcs::to_bytes(&result)
261            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
262    )
263}
264
265fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
266    let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
267        Ok(info) => info,
268        Err(_) => return Decision::Keep,
269    };
270
271    if balance_info.balance_delta == 0 {
272        Decision::Remove
273    } else {
274        Decision::Keep
275    }
276}
277
278fn balance_table_options() -> typed_store::rocks::DBOptions {
279    default_table_options()
280        .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
281        .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
282}
283
284impl CoinIndexInfo {
285    fn merge(&mut self, other: Self) {
286        self.coin_metadata_object_id = self
287            .coin_metadata_object_id
288            .or(other.coin_metadata_object_id);
289        self.regulated_coin_metadata_object_id = self
290            .regulated_coin_metadata_object_id
291            .or(other.regulated_coin_metadata_object_id);
292        self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
293    }
294}
295
296/// RocksDB tables for the RpcIndexStore
297///
298/// Anytime a new table is added, or and existing one has it's schema changed, make sure to also
299/// update the value of `CURRENT_DB_VERSION`.
300///
301/// NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
302/// - bounded in size by the live object set
303/// - are prune-able and have corresponding logic in the `prune` function
304#[derive(DBMapUtils)]
305struct IndexStoreTables {
306    /// A singleton that store metadata information on the DB.
307    ///
308    /// A few uses for this singleton:
309    /// - determining if the DB has been initialized (as some tables will still be empty post
310    ///   initialization)
311    /// - version of the DB. Everytime a new table or schema is changed the version number needs to
312    ///   be incremented.
313    meta: DBMap<(), MetadataInfo>,
314
315    /// Table used to track watermark for the highest indexed checkpoint
316    ///
317    /// This is useful to help know the highest checkpoint that was indexed in the event that the
318    /// node was running with indexes enabled, then run for a period of time with indexes disabled,
319    /// and then run with them enabled again so that the tables can be reinitialized.
320    #[default_options_override_fn = "default_table_options"]
321    watermark: DBMap<Watermark, CheckpointSequenceNumber>,
322
323    /// An index of extra metadata for Epochs.
324    ///
325    /// Only contains entries for transactions which have yet to be pruned from the main database.
326    #[default_options_override_fn = "default_table_options"]
327    epochs: DBMap<EpochId, EpochInfo>,
328
329    /// An index of extra metadata for Transactions.
330    ///
331    /// Only contains entries for transactions which have yet to be pruned from the main database.
332    #[default_options_override_fn = "default_table_options"]
333    #[allow(unused)]
334    #[deprecated]
335    transactions: DBMap<TransactionDigest, TransactionInfo>,
336
337    /// An index of object ownership.
338    ///
339    /// Allows an efficient iterator to list all objects currently owned by a specific user
340    /// account.
341    #[default_options_override_fn = "default_table_options"]
342    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
343
344    /// An index of dynamic fields (children objects).
345    ///
346    /// Allows an efficient iterator to list all of the dynamic fields owned by a particular
347    /// ObjectID.
348    #[default_options_override_fn = "default_table_options"]
349    dynamic_field: DBMap<DynamicFieldKey, ()>,
350
351    /// An index of Coin Types
352    ///
353    /// Allows looking up information related to published Coins, like the ObjectID of its
354    /// coorisponding CoinMetadata.
355    #[default_options_override_fn = "default_table_options"]
356    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
357
358    /// An index of Balances.
359    ///
360    /// Allows looking up balances by owner address and coin type.
361    #[default_options_override_fn = "balance_table_options"]
362    balance: DBMap<BalanceKey, BalanceIndexInfo>,
363
364    /// An index of Package versions.
365    ///
366    /// Maps original package ID and version to the storage ID of that version.
367    /// Allows efficient listing of all versions of a package.
368    #[default_options_override_fn = "default_table_options"]
369    package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
370
371    /// Authenticated events index by (stream_id, checkpoint_seq, transaction_idx, event_index)
372    events_by_stream: DBMap<EventIndexKey, ()>,
373    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
374    // - bounded in size by the live object set
375    // - are prune-able and have corresponding logic in the `prune` function
376}
377
378#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
379pub struct EventIndexKey {
380    pub stream_id: SuiAddress,
381    pub checkpoint_seq: u64,
382    /// The accumulator version that this event is settled into
383    pub accumulator_version: u64,
384    pub transaction_idx: u32,
385    pub event_index: u32,
386}
387
388/// Compaction filter for automatic pruning of old authenticated events during RocksDB compaction.
389#[derive(Clone)]
390pub struct EventsCompactionFilter {
391    pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
392}
393
394impl EventsCompactionFilter {
395    pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
396        Self { pruning_watermark }
397    }
398
399    pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
400        let event_key: EventIndexKey = bcs::from_bytes(key)?;
401        let watermark = self
402            .pruning_watermark
403            .load(std::sync::atomic::Ordering::Relaxed);
404
405        if event_key.checkpoint_seq <= watermark {
406            Ok(Decision::Remove)
407        } else {
408            Ok(Decision::Keep)
409        }
410    }
411}
412
413impl IndexStoreTables {
414    fn extract_version_if_package(
415        object: &Object,
416    ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
417        if let Data::Package(package) = &object.data {
418            let original_id = package.original_package_id();
419            let version = package.version().value();
420            let storage_id = object.id();
421
422            let key = PackageVersionKey {
423                original_package_id: original_id,
424                version,
425            };
426            let info = PackageVersionInfo { storage_id };
427            return Some((key, info));
428        }
429        None
430    }
431
432    fn open_with_index_options<P: Into<PathBuf>>(
433        path: P,
434        index_options: IndexStoreOptions,
435    ) -> Self {
436        let mut table_options = std::collections::BTreeMap::new();
437        table_options.insert("balance".to_string(), balance_table_options());
438        table_options.insert(
439            "events_by_stream".to_string(),
440            events_table_options(index_options.events_compaction_filter),
441        );
442
443        IndexStoreTables::open_tables_read_write_with_deprecation_option(
444            path.into(),
445            MetricConf::new("rpc-index"),
446            None,
447            Some(DBMapTableConfigMap::new(table_options)),
448            true, // remove deprecated tables
449        )
450    }
451
452    fn open_with_options<P: Into<PathBuf>>(
453        path: P,
454        options: typed_store::rocksdb::Options,
455        table_options: Option<DBMapTableConfigMap>,
456    ) -> Self {
457        IndexStoreTables::open_tables_read_write_with_deprecation_option(
458            path.into(),
459            MetricConf::new("rpc-index"),
460            Some(options),
461            table_options,
462            true, // remove deprecated tables
463        )
464    }
465
466    fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
467        (match self.meta.get(&()) {
468            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
469            Ok(None) => true,
470            Err(_) => true,
471        }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
472    }
473
474    // Check if the index watermark is behind the highets_executed watermark.
475    fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
476        let highest_executed_checkpint = checkpoint_store
477            .get_highest_executed_checkpoint_seq_number()
478            .ok()
479            .flatten();
480        let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
481        watermark < highest_executed_checkpint
482    }
483
484    #[tracing::instrument(skip_all)]
485    fn init(
486        &mut self,
487        authority_store: &AuthorityStore,
488        checkpoint_store: &CheckpointStore,
489        _epoch_store: &AuthorityPerEpochStore,
490        _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
491        batch_size_limit: usize,
492        rpc_config: &sui_config::RpcConfig,
493    ) -> Result<(), StorageError> {
494        info!("Initializing RPC indexes");
495
496        let highest_executed_checkpint =
497            checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
498        let lowest_available_checkpoint = checkpoint_store
499            .get_highest_pruned_checkpoint_seq_number()?
500            .map(|c| c.saturating_add(1))
501            .unwrap_or(0);
502        let lowest_available_checkpoint_objects = authority_store
503            .perpetual_tables
504            .get_highest_pruned_checkpoint()?
505            .map(|c| c.saturating_add(1))
506            .unwrap_or(0);
507        // Doing backfill requires processing objects so we have to restrict our backfill range
508        // to the range of checkpoints that we have objects for.
509        let lowest_available_checkpoint =
510            lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
511
512        let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
513            lowest_available_checkpoint..=highest_executed_checkpint
514        });
515
516        if let Some(checkpoint_range) = checkpoint_range {
517            self.index_existing_checkpoints(
518                authority_store,
519                checkpoint_store,
520                checkpoint_range,
521                rpc_config,
522            )?;
523        }
524
525        self.initialize_current_epoch(authority_store, checkpoint_store)?;
526
527        // Only index live objects if genesis checkpoint has been executed.
528        // If genesis hasn't been executed yet, the objects will be properly indexed
529        // as checkpoints are processed through the normal checkpoint execution path.
530        if highest_executed_checkpint.is_some() {
531            let coin_index = Mutex::new(HashMap::new());
532
533            let make_live_object_indexer = RpcParLiveObjectSetIndexer {
534                tables: self,
535                coin_index: &coin_index,
536                batch_size_limit,
537            };
538
539            crate::par_index_live_object_set::par_index_live_object_set(
540                authority_store,
541                &make_live_object_indexer,
542            )?;
543
544            self.coin.multi_insert(coin_index.into_inner().unwrap())?;
545        }
546
547        self.watermark.insert(
548            &Watermark::Indexed,
549            &highest_executed_checkpint.unwrap_or(0),
550        )?;
551
552        self.meta.insert(
553            &(),
554            &MetadataInfo {
555                version: CURRENT_DB_VERSION,
556            },
557        )?;
558
559        info!("Finished initializing RPC indexes");
560
561        Ok(())
562    }
563
564    #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
565    fn index_existing_checkpoints(
566        &mut self,
567        authority_store: &AuthorityStore,
568        checkpoint_store: &CheckpointStore,
569        checkpoint_range: std::ops::RangeInclusive<u64>,
570        rpc_config: &sui_config::RpcConfig,
571    ) -> Result<(), StorageError> {
572        info!(
573            "Indexing {} checkpoints in range {checkpoint_range:?}",
574            checkpoint_range.size_hint().0
575        );
576        let start_time = Instant::now();
577
578        checkpoint_range.into_par_iter().try_for_each(|seq| {
579            let load_events = rpc_config.authenticated_events_indexing();
580            let Some(checkpoint_data) = sparse_checkpoint_data_for_epoch_backfill(
581                authority_store,
582                checkpoint_store,
583                seq,
584                load_events,
585            )?
586            else {
587                return Ok(());
588            };
589
590            let mut batch = self.epochs.batch();
591
592            self.index_epoch(&checkpoint_data, &mut batch)?;
593
594            batch
595                .write_opt(bulk_ingestion_write_options())
596                .map_err(StorageError::from)
597        })?;
598
599        info!(
600            "Indexing checkpoints took {} seconds",
601            start_time.elapsed().as_secs()
602        );
603        Ok(())
604    }
605
606    /// Prune data from this Index
607    fn prune(
608        &self,
609        pruned_checkpoint_watermark: u64,
610        _checkpoint_contents_to_prune: &[CheckpointContents],
611    ) -> Result<(), TypedStoreError> {
612        let mut batch = self.watermark.batch();
613
614        batch.insert_batch(
615            &self.watermark,
616            [(Watermark::Pruned, pruned_checkpoint_watermark)],
617        )?;
618
619        batch.write()
620    }
621
622    /// Index a Checkpoint
623    fn index_checkpoint(
624        &self,
625        checkpoint: &CheckpointData,
626        _resolver: &mut dyn LayoutResolver,
627        rpc_config: &sui_config::RpcConfig,
628    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
629        debug!(
630            checkpoint = checkpoint.checkpoint_summary.sequence_number,
631            "indexing checkpoint"
632        );
633
634        let mut batch = self.owner.batch();
635
636        self.index_epoch(checkpoint, &mut batch)?;
637        self.index_transactions(
638            checkpoint,
639            &mut batch,
640            rpc_config.authenticated_events_indexing(),
641        )?;
642        self.index_objects(checkpoint, &mut batch)?;
643
644        batch.insert_batch(
645            &self.watermark,
646            [(
647                Watermark::Indexed,
648                checkpoint.checkpoint_summary.sequence_number,
649            )],
650        )?;
651
652        debug!(
653            checkpoint = checkpoint.checkpoint_summary.sequence_number,
654            "finished indexing checkpoint"
655        );
656
657        Ok(batch)
658    }
659
660    fn extract_accumulator_version(
661        &self,
662        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
663    ) -> Option<u64> {
664        let TransactionKind::ProgrammableSystemTransaction(pt) =
665            tx.transaction.transaction_data().kind()
666        else {
667            return None;
668        };
669
670        if pt.shared_input_objects().any(|obj| {
671            obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
672                && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
673        }) {
674            return tx.output_objects.iter().find_map(|obj| {
675                if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
676                    Some(obj.version().value())
677                } else {
678                    None
679                }
680            });
681        }
682
683        None
684    }
685
686    fn index_transaction_events(
687        &self,
688        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
689        checkpoint_seq: u64,
690        tx_idx: u32,
691        accumulator_version: Option<u64>,
692        batch: &mut typed_store::rocks::DBBatch,
693    ) -> Result<(), StorageError> {
694        let acc_events = tx.effects.accumulator_events();
695        if acc_events.is_empty() {
696            return Ok(());
697        }
698
699        let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
700        for acc in acc_events {
701            if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
702                let Some(accumulator_version) = accumulator_version else {
703                    mysten_common::debug_fatal!(
704                        "Found events at checkpoint {} tx {} before any accumulator settlement",
705                        checkpoint_seq,
706                        tx_idx
707                    );
708                    continue;
709                };
710
711                if let Some(stream_id) =
712                    sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
713                {
714                    for (idx, _d) in event_digests {
715                        let key = EventIndexKey {
716                            stream_id,
717                            checkpoint_seq,
718                            accumulator_version,
719                            transaction_idx: tx_idx,
720                            event_index: *idx as u32,
721                        };
722                        entries.push((key, ()));
723                    }
724                }
725            }
726        }
727
728        if !entries.is_empty() {
729            batch.insert_batch(&self.events_by_stream, entries)?;
730        }
731        Ok(())
732    }
733
734    fn index_epoch(
735        &self,
736        checkpoint: &CheckpointData,
737        batch: &mut typed_store::rocks::DBBatch,
738    ) -> Result<(), StorageError> {
739        let Some(epoch_info) = checkpoint.epoch_info()? else {
740            return Ok(());
741        };
742        if epoch_info.epoch > 0 {
743            let prev_epoch = epoch_info.epoch - 1;
744            let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
745            current_epoch.epoch = prev_epoch; // set this incase there wasn't an entry
746            current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
747            current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
748            batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
749        }
750        batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
751        Ok(())
752    }
753
754    // After attempting to reindex past epochs, ensure that the current epoch is at least partially
755    // initalized
756    fn initialize_current_epoch(
757        &mut self,
758        authority_store: &AuthorityStore,
759        checkpoint_store: &CheckpointStore,
760    ) -> Result<(), StorageError> {
761        let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
762            return Ok(());
763        };
764
765        let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
766            .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
767
768        let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
769        epoch.epoch = checkpoint.epoch;
770
771        if epoch.protocol_version.is_none() {
772            epoch.protocol_version = Some(system_state.protocol_version());
773        }
774
775        if epoch.start_timestamp_ms.is_none() {
776            epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
777        }
778
779        if epoch.reference_gas_price.is_none() {
780            epoch.reference_gas_price = Some(system_state.reference_gas_price());
781        }
782
783        if epoch.system_state.is_none() {
784            epoch.system_state = Some(system_state);
785        }
786
787        self.epochs.insert(&epoch.epoch, &epoch)?;
788
789        Ok(())
790    }
791
792    fn index_transactions(
793        &self,
794        checkpoint: &CheckpointData,
795        batch: &mut typed_store::rocks::DBBatch,
796        index_events: bool,
797    ) -> Result<(), StorageError> {
798        let cp = checkpoint.checkpoint_summary.sequence_number;
799        let mut current_accumulator_version: Option<u64> = None;
800
801        // iterate in reverse order, process accumulator settlements first
802        for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
803            let balance_changes = sui_types::balance_change::derive_balance_changes(
804                &tx.effects,
805                &tx.input_objects,
806                &tx.output_objects,
807            )
808            .into_iter()
809            .filter_map(|change| {
810                if let TypeTag::Struct(coin_type) = change.coin_type {
811                    Some((
812                        BalanceKey {
813                            owner: change.address,
814                            coin_type: *coin_type,
815                        },
816                        BalanceIndexInfo {
817                            balance_delta: change.amount,
818                        },
819                    ))
820                } else {
821                    None
822                }
823            });
824            batch.partial_merge_batch(&self.balance, balance_changes)?;
825
826            if index_events {
827                if let Some(version) = self.extract_accumulator_version(tx) {
828                    current_accumulator_version = Some(version);
829                }
830
831                self.index_transaction_events(
832                    tx,
833                    cp,
834                    tx_idx as u32,
835                    current_accumulator_version,
836                    batch,
837                )?;
838            }
839        }
840
841        Ok(())
842    }
843
844    fn index_objects(
845        &self,
846        checkpoint: &CheckpointData,
847        batch: &mut typed_store::rocks::DBBatch,
848    ) -> Result<(), StorageError> {
849        let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
850        let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
851
852        for tx in &checkpoint.transactions {
853            // determine changes from removed objects
854            for removed_object in tx.removed_objects_pre_version() {
855                match removed_object.owner() {
856                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
857                        let owner_key = OwnerIndexKey::from_object(removed_object);
858                        batch.delete_batch(&self.owner, [owner_key])?;
859                    }
860                    Owner::ObjectOwner(object_id) => {
861                        batch.delete_batch(
862                            &self.dynamic_field,
863                            [DynamicFieldKey::new(*object_id, removed_object.id())],
864                        )?;
865                    }
866                    Owner::Shared { .. } | Owner::Immutable => {}
867                }
868            }
869
870            // determine changes from changed objects
871            for (object, old_object) in tx.changed_objects() {
872                if let Some(old_object) = old_object {
873                    match old_object.owner() {
874                        Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
875                            let owner_key = OwnerIndexKey::from_object(old_object);
876                            batch.delete_batch(&self.owner, [owner_key])?;
877                        }
878
879                        Owner::ObjectOwner(object_id) => {
880                            if old_object.owner() != object.owner() {
881                                batch.delete_batch(
882                                    &self.dynamic_field,
883                                    [DynamicFieldKey::new(*object_id, old_object.id())],
884                                )?;
885                            }
886                        }
887
888                        Owner::Shared { .. } | Owner::Immutable => {}
889                    }
890                }
891
892                match object.owner() {
893                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
894                        let owner_key = OwnerIndexKey::from_object(object);
895                        let owner_info = OwnerIndexInfo::new(object);
896                        batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
897                    }
898                    Owner::ObjectOwner(parent) => {
899                        if should_index_dynamic_field(object) {
900                            let field_key = DynamicFieldKey::new(*parent, object.id());
901                            batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
902                        }
903                    }
904                    Owner::Shared { .. } | Owner::Immutable => {}
905                }
906                if let Some((key, info)) = Self::extract_version_if_package(object) {
907                    package_version_index.push((key, info));
908                }
909            }
910
911            // coin indexing
912            //
913            // coin indexing relies on the fact that CoinMetadata and TreasuryCap are created in
914            // the same transaction so we don't need to worry about overriding any older value
915            // that may exist in the database (because there necessarily cannot be).
916            for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
917                use std::collections::hash_map::Entry;
918
919                match coin_index.entry(key) {
920                    Entry::Occupied(mut o) => {
921                        o.get_mut().merge(value);
922                    }
923                    Entry::Vacant(v) => {
924                        v.insert(value);
925                    }
926                }
927            }
928        }
929
930        batch.insert_batch(&self.coin, coin_index)?;
931        batch.insert_batch(&self.package_version, package_version_index)?;
932
933        Ok(())
934    }
935
936    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
937        self.epochs.get(&epoch)
938    }
939
940    fn event_iter(
941        &self,
942        stream_id: SuiAddress,
943        start_checkpoint: u64,
944        start_accumulator_version: u64,
945        start_transaction_idx: u32,
946        start_event_idx: u32,
947        end_checkpoint: u64,
948        limit: u32,
949    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
950    {
951        let lower = EventIndexKey {
952            stream_id,
953            checkpoint_seq: start_checkpoint,
954            accumulator_version: start_accumulator_version,
955            transaction_idx: start_transaction_idx,
956            event_index: start_event_idx,
957        };
958        let upper = EventIndexKey {
959            stream_id,
960            checkpoint_seq: end_checkpoint,
961            accumulator_version: u64::MAX,
962            transaction_idx: u32::MAX,
963            event_index: u32::MAX,
964        };
965
966        Ok(self
967            .events_by_stream
968            .safe_iter_with_bounds(Some(lower), Some(upper))
969            .map(|res| res.map(|(k, _)| k))
970            .take(limit as usize))
971    }
972
973    fn owner_iter(
974        &self,
975        owner: SuiAddress,
976        object_type: Option<StructTag>,
977        cursor: Option<OwnerIndexKey>,
978    ) -> Result<
979        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
980        TypedStoreError,
981    > {
982        // TODO can we figure out how to pass a raw byte array as a cursor?
983        let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
984            owner,
985            object_type: object_type
986                .clone()
987                .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
988            inverted_balance: None,
989            object_id: ObjectID::ZERO,
990        });
991
992        Ok(self
993            .owner
994            .safe_iter_with_bounds(Some(lower_bound), None)
995            .take_while(move |item| {
996                // If there's an error let if flow through
997                let Ok((key, _)) = item else {
998                    return true;
999                };
1000
1001                // Only take if owner matches
1002                key.owner == owner
1003                    // and if an object type was supplied that the type matches
1004                    && object_type
1005                        .as_ref()
1006                        .map(|ty| {
1007                            ty.address == key.object_type.address
1008                                && ty.module == key.object_type.module
1009                                && ty.name == key.object_type.name
1010                                // If type_params are not provided then we match all params
1011                                && (ty.type_params.is_empty() ||
1012                                    // If they are provided the type params must match
1013                                    ty.type_params == key.object_type.type_params)
1014                        }).unwrap_or(true)
1015            }))
1016    }
1017
1018    fn dynamic_field_iter(
1019        &self,
1020        parent: ObjectID,
1021        cursor: Option<ObjectID>,
1022    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1023    {
1024        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1025        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1026        let iter = self
1027            .dynamic_field
1028            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1029            .map_ok(|(key, ())| key);
1030        Ok(iter)
1031    }
1032
1033    fn get_coin_info(
1034        &self,
1035        coin_type: &StructTag,
1036    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1037        let key = CoinIndexKey {
1038            coin_type: coin_type.to_owned(),
1039        };
1040        self.coin.get(&key)
1041    }
1042
1043    fn get_balance(
1044        &self,
1045        owner: &SuiAddress,
1046        coin_type: &StructTag,
1047    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1048        let key = BalanceKey {
1049            owner: owner.to_owned(),
1050            coin_type: coin_type.to_owned(),
1051        };
1052        self.balance.get(&key)
1053    }
1054
1055    fn balance_iter(
1056        &self,
1057        owner: SuiAddress,
1058        cursor: Option<BalanceKey>,
1059    ) -> Result<
1060        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1061        TypedStoreError,
1062    > {
1063        let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1064            owner,
1065            coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1066        });
1067
1068        Ok(self
1069            .balance
1070            .safe_iter_with_bounds(Some(lower_bound), None)
1071            .scan((), move |_, item| {
1072                match item {
1073                    Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1074                    Ok(_) => None,          // Different owner, stop iteration
1075                    Err(e) => Some(Err(e)), // Propagate error
1076                }
1077            }))
1078    }
1079
1080    fn package_versions_iter(
1081        &self,
1082        original_id: ObjectID,
1083        cursor: Option<u64>,
1084    ) -> Result<
1085        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1086        TypedStoreError,
1087    > {
1088        let lower_bound = PackageVersionKey {
1089            original_package_id: original_id,
1090            version: cursor.unwrap_or(0),
1091        };
1092        let upper_bound = PackageVersionKey {
1093            original_package_id: original_id,
1094            version: u64::MAX,
1095        };
1096
1097        Ok(self
1098            .package_version
1099            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1100    }
1101}
1102
1103pub struct RpcIndexStore {
1104    tables: IndexStoreTables,
1105    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1106    rpc_config: sui_config::RpcConfig,
1107}
1108
1109impl RpcIndexStore {
1110    /// Given the provided directory, construct the path to the db
1111    fn db_path(dir: &Path) -> PathBuf {
1112        dir.join("rpc-index")
1113    }
1114
1115    pub async fn new(
1116        dir: &Path,
1117        authority_store: &AuthorityStore,
1118        checkpoint_store: &CheckpointStore,
1119        epoch_store: &AuthorityPerEpochStore,
1120        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1121        pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1122        rpc_config: sui_config::RpcConfig,
1123    ) -> Self {
1124        let events_filter = EventsCompactionFilter::new(pruning_watermark);
1125        let index_options = IndexStoreOptions {
1126            events_compaction_filter: Some(events_filter),
1127        };
1128
1129        Self::new_with_options(
1130            dir,
1131            authority_store,
1132            checkpoint_store,
1133            epoch_store,
1134            package_store,
1135            index_options,
1136            rpc_config,
1137        )
1138        .await
1139    }
1140
1141    pub async fn new_with_options(
1142        dir: &Path,
1143        authority_store: &AuthorityStore,
1144        checkpoint_store: &CheckpointStore,
1145        epoch_store: &AuthorityPerEpochStore,
1146        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1147        index_options: IndexStoreOptions,
1148        rpc_config: sui_config::RpcConfig,
1149    ) -> Self {
1150        let path = Self::db_path(dir);
1151        let index_config = rpc_config.index_initialization_config();
1152
1153        let tables = {
1154            let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1155
1156            // If the index tables are uninitialized or on an older version then we need to
1157            // populate them
1158            if tables.needs_to_do_initialization(checkpoint_store) {
1159                let batch_size_limit;
1160
1161                let mut tables = {
1162                    drop(tables);
1163                    typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1164                        .await
1165                        .expect("unable to destroy old rpc-index db");
1166
1167                    // Open the empty DB with `unordered_write`s enabled in order to get a ~3x
1168                    // speedup when indexing
1169                    let mut options = typed_store::rocksdb::Options::default();
1170                    options.set_unordered_write(true);
1171
1172                    // Allow CPU-intensive flushing operations to use all CPUs.
1173                    let max_background_jobs = if let Some(jobs) =
1174                        index_config.as_ref().and_then(|c| c.max_background_jobs)
1175                    {
1176                        debug!("Using config override for max_background_jobs: {}", jobs);
1177                        jobs
1178                    } else {
1179                        let jobs = num_cpus::get() as i32;
1180                        debug!(
1181                            "Calculated max_background_jobs: {} (based on CPU count)",
1182                            jobs
1183                        );
1184                        jobs
1185                    };
1186                    options.set_max_background_jobs(max_background_jobs);
1187
1188                    // We are disabling compaction for all column families below. This means we can
1189                    // also disable the backpressure that slows down writes when the number of L0
1190                    // files builds up since we will never compact them anyway.
1191                    options.set_level_zero_file_num_compaction_trigger(0);
1192                    options.set_level_zero_slowdown_writes_trigger(-1);
1193                    options.set_level_zero_stop_writes_trigger(i32::MAX);
1194
1195                    let total_memory_bytes = get_available_memory();
1196                    // This is an upper bound on the amount to of ram the memtables can use across
1197                    // all column families.
1198                    let db_buffer_size = if let Some(size) =
1199                        index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1200                    {
1201                        debug!(
1202                            "Using config override for db_write_buffer_size: {} bytes",
1203                            size
1204                        );
1205                        size
1206                    } else {
1207                        // Default to 80% of system RAM
1208                        let size = (total_memory_bytes as f64 * 0.8) as usize;
1209                        debug!(
1210                            "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1211                            size, total_memory_bytes
1212                        );
1213                        size
1214                    };
1215                    options.set_db_write_buffer_size(db_buffer_size);
1216
1217                    // Create column family specific options.
1218                    let mut table_config_map = BTreeMap::new();
1219
1220                    // Create options with compactions disabled and large write buffers.
1221                    // Each CF can use up to 25% of system RAM, but total is still limited by
1222                    // set_db_write_buffer_size configured above.
1223                    let mut cf_options = typed_store::rocks::default_db_options();
1224                    cf_options.options.set_disable_auto_compactions(true);
1225
1226                    let (buffer_size, buffer_count) = match (
1227                        index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1228                        index_config
1229                            .as_ref()
1230                            .and_then(|c| c.cf_max_write_buffer_number),
1231                    ) {
1232                        (Some(size), Some(count)) => {
1233                            debug!(
1234                                "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1235                                size, count
1236                            );
1237                            (size, count)
1238                        }
1239                        (None, None) => {
1240                            // Calculate buffer configuration: 25% of RAM split across buffers
1241                            let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1242                            debug!(
1243                                "Column family memory budget: {} bytes (25% of {} total bytes)",
1244                                cf_memory_budget, total_memory_bytes
1245                            );
1246                            const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB minimum
1247
1248                            // Target number of buffers based on CPU count
1249                            // More CPUs = more parallel flushing capability
1250                            let target_buffer_count = num_cpus::get().max(2);
1251
1252                            // Aim for CPU-based buffer count, but reduce if it would make buffers too small
1253                            //   For example:
1254                            // - 128GB RAM, 32 CPUs: 32GB per CF / 32 buffers = 1GB each
1255                            // - 16GB RAM, 8 CPUs: 4GB per CF / 8 buffers = 512MB each
1256                            // - 4GB RAM, 8 CPUs: 1GB per CF / 64MB min = ~16 buffers of 64MB each
1257                            let buffer_size =
1258                                (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1259                            let buffer_count = (cf_memory_budget / buffer_size)
1260                                .clamp(2, target_buffer_count)
1261                                as i32;
1262                            debug!(
1263                                "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1264                                buffer_size, buffer_count, target_buffer_count
1265                            );
1266                            (buffer_size, buffer_count)
1267                        }
1268                        _ => {
1269                            panic!(
1270                                "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1271                            );
1272                        }
1273                    };
1274
1275                    cf_options.options.set_write_buffer_size(buffer_size);
1276                    cf_options.options.set_max_write_buffer_number(buffer_count);
1277
1278                    // Calculate batch size limit: default to half the buffer size or 128MB, whichever is smaller
1279                    batch_size_limit = if let Some(limit) =
1280                        index_config.as_ref().and_then(|c| c.batch_size_limit)
1281                    {
1282                        debug!(
1283                            "Using config override for batch_size_limit: {} bytes",
1284                            limit
1285                        );
1286                        limit
1287                    } else {
1288                        let half_buffer = buffer_size / 2;
1289                        let default_limit = 1 << 27; // 128MB
1290                        let limit = half_buffer.min(default_limit);
1291                        debug!(
1292                            "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1293                            limit, half_buffer, default_limit
1294                        );
1295                        limit
1296                    };
1297
1298                    // Apply cf_options to all tables
1299                    for (table_name, _) in IndexStoreTables::describe_tables() {
1300                        table_config_map.insert(table_name, cf_options.clone());
1301                    }
1302
1303                    // Override Balance options with the merge operator
1304                    let mut balance_options = cf_options.clone();
1305                    balance_options = balance_options.set_merge_operator_associative(
1306                        "balance_merge",
1307                        balance_delta_merge_operator,
1308                    );
1309                    table_config_map.insert("balance".to_string(), balance_options);
1310
1311                    table_config_map.insert(
1312                        "events_by_stream".to_string(),
1313                        events_table_options(index_options.events_compaction_filter.clone()),
1314                    );
1315
1316                    IndexStoreTables::open_with_options(
1317                        &path,
1318                        options,
1319                        Some(DBMapTableConfigMap::new(table_config_map)),
1320                    )
1321                };
1322
1323                tables
1324                    .init(
1325                        authority_store,
1326                        checkpoint_store,
1327                        epoch_store,
1328                        package_store,
1329                        batch_size_limit,
1330                        &rpc_config,
1331                    )
1332                    .expect("unable to initialize rpc index from live object set");
1333
1334                // Flush all data to disk before dropping tables.
1335                // This is critical because WAL is disabled during bulk indexing.
1336                // Note we only need to call flush on one table because all tables share the same
1337                // underlying database.
1338                tables
1339                    .meta
1340                    .flush()
1341                    .expect("Failed to flush RPC index tables to disk");
1342
1343                let weak_db = Arc::downgrade(&tables.meta.db);
1344                drop(tables);
1345
1346                let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1347                loop {
1348                    if weak_db.strong_count() == 0 {
1349                        break;
1350                    }
1351                    if std::time::Instant::now() > deadline {
1352                        panic!("unable to reopen DB after indexing");
1353                    }
1354                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1355                }
1356
1357                // Reopen the DB with default options (eg without `unordered_write`s enabled)
1358                let reopened_tables =
1359                    IndexStoreTables::open_with_index_options(&path, index_options);
1360
1361                // Sanity check: verify the database version was persisted correctly
1362                let stored_version = reopened_tables
1363                    .meta
1364                    .get(&())
1365                    .expect("Failed to read metadata from reopened database")
1366                    .expect("Metadata not found in reopened database");
1367                assert_eq!(
1368                    stored_version.version, CURRENT_DB_VERSION,
1369                    "Database version mismatch after flush and reopen: expected {}, found {}",
1370                    CURRENT_DB_VERSION, stored_version.version
1371                );
1372
1373                reopened_tables
1374            } else {
1375                tables
1376            }
1377        };
1378
1379        Self {
1380            tables,
1381            pending_updates: Default::default(),
1382            rpc_config,
1383        }
1384    }
1385
1386    pub fn new_without_init(dir: &Path) -> Self {
1387        let path = Self::db_path(dir);
1388        let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1389
1390        Self {
1391            tables,
1392            pending_updates: Default::default(),
1393            rpc_config: sui_config::RpcConfig::default(),
1394        }
1395    }
1396
1397    pub fn prune(
1398        &self,
1399        pruned_checkpoint_watermark: u64,
1400        checkpoint_contents_to_prune: &[CheckpointContents],
1401    ) -> Result<(), TypedStoreError> {
1402        self.tables
1403            .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1404    }
1405
1406    /// Index a checkpoint and stage the index updated in `pending_updates`.
1407    ///
1408    /// Updates will not be committed to the database until `commit_update_for_checkpoint` is
1409    /// called.
1410    #[tracing::instrument(
1411        skip_all,
1412        fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1413    )]
1414    pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1415        let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1416        let batch = self
1417            .tables
1418            .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1419            .expect("db error");
1420
1421        self.pending_updates
1422            .lock()
1423            .unwrap()
1424            .insert(sequence_number, batch);
1425    }
1426
1427    /// Commits the pending updates for the provided checkpoint number.
1428    ///
1429    /// Invariants:
1430    /// - `index_checkpoint` must have been called for the provided checkpoint
1431    /// - Callers of this function must ensure that it is called for each checkpoint in sequential
1432    ///   order. This will panic if the provided checkpoint does not match the expected next
1433    ///   checkpoint to commit.
1434    #[tracing::instrument(skip(self))]
1435    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1436        let next_batch = self.pending_updates.lock().unwrap().pop_first();
1437
1438        // Its expected that the next batch exists
1439        let (next_sequence_number, batch) = next_batch.unwrap();
1440        assert_eq!(
1441            checkpoint, next_sequence_number,
1442            "commit_update_for_checkpoint must be called in order"
1443        );
1444
1445        Ok(batch.write()?)
1446    }
1447
1448    pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1449        self.tables.get_epoch_info(epoch)
1450    }
1451
1452    pub fn owner_iter(
1453        &self,
1454        owner: SuiAddress,
1455        object_type: Option<StructTag>,
1456        cursor: Option<OwnerIndexKey>,
1457    ) -> Result<
1458        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1459        TypedStoreError,
1460    > {
1461        self.tables.owner_iter(owner, object_type, cursor)
1462    }
1463
1464    pub fn dynamic_field_iter(
1465        &self,
1466        parent: ObjectID,
1467        cursor: Option<ObjectID>,
1468    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1469    {
1470        self.tables.dynamic_field_iter(parent, cursor)
1471    }
1472
1473    pub fn get_coin_info(
1474        &self,
1475        coin_type: &StructTag,
1476    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1477        self.tables.get_coin_info(coin_type)
1478    }
1479
1480    pub fn get_balance(
1481        &self,
1482        owner: &SuiAddress,
1483        coin_type: &StructTag,
1484    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1485        self.tables.get_balance(owner, coin_type)
1486    }
1487
1488    pub fn balance_iter(
1489        &self,
1490        owner: SuiAddress,
1491        cursor: Option<BalanceKey>,
1492    ) -> Result<
1493        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1494        TypedStoreError,
1495    > {
1496        self.tables.balance_iter(owner, cursor)
1497    }
1498
1499    pub fn package_versions_iter(
1500        &self,
1501        original_id: ObjectID,
1502        cursor: Option<u64>,
1503    ) -> Result<
1504        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1505        TypedStoreError,
1506    > {
1507        self.tables.package_versions_iter(original_id, cursor)
1508    }
1509
1510    pub fn event_iter(
1511        &self,
1512        stream_id: SuiAddress,
1513        start_checkpoint: u64,
1514        start_accumulator_version: u64,
1515        start_transaction_idx: u32,
1516        start_event_idx: u32,
1517        end_checkpoint: u64,
1518        limit: u32,
1519    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1520    {
1521        self.tables.event_iter(
1522            stream_id,
1523            start_checkpoint,
1524            start_accumulator_version,
1525            start_transaction_idx,
1526            start_event_idx,
1527            end_checkpoint,
1528            limit,
1529        )
1530    }
1531
1532    pub fn get_highest_indexed_checkpoint_seq_number(
1533        &self,
1534    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1535        self.tables.watermark.get(&Watermark::Indexed)
1536    }
1537}
1538
1539fn should_index_dynamic_field(object: &Object) -> bool {
1540    // Skip any objects that aren't of type `Field<Name, Value>`
1541    //
1542    // All dynamic fields are of type:
1543    //   - Field<Name, Value> for dynamic fields
1544    //   - Field<Wrapper<Name>, ID>> for dynamic field objects where the ID is the id of the pointed
1545    //   to object
1546    //
1547    object
1548        .data
1549        .try_as_move()
1550        .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1551}
1552
1553fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1554    use sui_types::coin::CoinMetadata;
1555    use sui_types::coin::RegulatedCoinMetadata;
1556    use sui_types::coin::TreasuryCap;
1557
1558    let object_type = object.type_().and_then(MoveObjectType::other)?;
1559
1560    if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1561        return Some((
1562            CoinIndexKey { coin_type },
1563            CoinIndexInfo {
1564                coin_metadata_object_id: Some(object.id()),
1565                treasury_object_id: None,
1566                regulated_coin_metadata_object_id: None,
1567            },
1568        ));
1569    }
1570
1571    if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1572        return Some((
1573            CoinIndexKey { coin_type },
1574            CoinIndexInfo {
1575                coin_metadata_object_id: None,
1576                treasury_object_id: Some(object.id()),
1577                regulated_coin_metadata_object_id: None,
1578            },
1579        ));
1580    }
1581
1582    if let Some(coin_type) =
1583        RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1584    {
1585        return Some((
1586            CoinIndexKey { coin_type },
1587            CoinIndexInfo {
1588                coin_metadata_object_id: None,
1589                treasury_object_id: None,
1590                regulated_coin_metadata_object_id: Some(object.id()),
1591            },
1592        ));
1593    }
1594
1595    None
1596}
1597
1598struct RpcParLiveObjectSetIndexer<'a> {
1599    tables: &'a IndexStoreTables,
1600    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1601    batch_size_limit: usize,
1602}
1603
1604struct RpcLiveObjectIndexer<'a> {
1605    tables: &'a IndexStoreTables,
1606    batch: typed_store::rocks::DBBatch,
1607    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1608    balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1609    batch_size_limit: usize,
1610}
1611
1612impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1613    type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1614
1615    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1616        RpcLiveObjectIndexer {
1617            tables: self.tables,
1618            batch: self.tables.owner.batch(),
1619            coin_index: self.coin_index,
1620            balance_changes: HashMap::new(),
1621            batch_size_limit: self.batch_size_limit,
1622        }
1623    }
1624}
1625
1626impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1627    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1628        match object.owner {
1629            // Owner Index
1630            Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1631                let owner_key = OwnerIndexKey::from_object(&object);
1632                let owner_info = OwnerIndexInfo::new(&object);
1633                self.batch
1634                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1635
1636                if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1637                    let balance_key = BalanceKey { owner, coin_type };
1638                    let balance_info = BalanceIndexInfo::from(value);
1639                    self.balance_changes
1640                        .entry(balance_key)
1641                        .or_default()
1642                        .merge_delta(&balance_info);
1643
1644                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1645                        self.batch.partial_merge_batch(
1646                            &self.tables.balance,
1647                            std::mem::take(&mut self.balance_changes),
1648                        )?;
1649                    }
1650                }
1651            }
1652
1653            // Dynamic Field Index
1654            Owner::ObjectOwner(parent) => {
1655                if should_index_dynamic_field(&object) {
1656                    let field_key = DynamicFieldKey::new(parent, object.id());
1657                    self.batch
1658                        .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1659                }
1660
1661                // Index address balances
1662                if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
1663                    && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
1664                {
1665                    let balance_key = BalanceKey { owner, coin_type };
1666                    let balance_info = BalanceIndexInfo {
1667                        balance_delta: balance,
1668                    };
1669                    self.balance_changes
1670                        .entry(balance_key)
1671                        .or_default()
1672                        .merge_delta(&balance_info);
1673
1674                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1675                        self.batch.partial_merge_batch(
1676                            &self.tables.balance,
1677                            std::mem::take(&mut self.balance_changes),
1678                        )?;
1679                    }
1680                }
1681            }
1682
1683            Owner::Shared { .. } | Owner::Immutable => {}
1684        }
1685
1686        // Look for CoinMetadata<T> and TreasuryCap<T> objects
1687        if let Some((key, value)) = try_create_coin_index_info(&object) {
1688            use std::collections::hash_map::Entry;
1689
1690            match self.coin_index.lock().unwrap().entry(key) {
1691                Entry::Occupied(mut o) => {
1692                    o.get_mut().merge(value);
1693                }
1694                Entry::Vacant(v) => {
1695                    v.insert(value);
1696                }
1697            }
1698        }
1699
1700        if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1701            self.batch
1702                .insert_batch(&self.tables.package_version, [(key, info)])?;
1703        }
1704
1705        // If the batch size grows to greater than the limit then write out to the DB so that the
1706        // data we need to hold in memory doesn't grow unbounded.
1707        if self.batch.size_in_bytes() >= self.batch_size_limit {
1708            std::mem::replace(&mut self.batch, self.tables.owner.batch())
1709                .write_opt(bulk_ingestion_write_options())?;
1710        }
1711
1712        Ok(())
1713    }
1714
1715    fn finish(mut self) -> Result<(), StorageError> {
1716        self.batch.partial_merge_batch(
1717            &self.tables.balance,
1718            std::mem::take(&mut self.balance_changes),
1719        )?;
1720        self.batch.write_opt(bulk_ingestion_write_options())?;
1721        Ok(())
1722    }
1723}
1724
1725// TODO figure out a way to dedup this logic. Today we'd need to do quite a bit of refactoring to
1726// make it possible.
1727fn sparse_checkpoint_data_for_epoch_backfill(
1728    authority_store: &AuthorityStore,
1729    checkpoint_store: &CheckpointStore,
1730    checkpoint: u64,
1731    load_events: bool,
1732) -> Result<Option<CheckpointData>, StorageError> {
1733    use sui_types::full_checkpoint_content::CheckpointTransaction;
1734
1735    let summary = checkpoint_store
1736        .get_checkpoint_by_sequence_number(checkpoint)?
1737        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1738
1739    // Only load genesis and end of epoch checkpoints
1740    if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
1741        return Ok(None);
1742    }
1743
1744    let contents = checkpoint_store
1745        .get_checkpoint_contents(&summary.content_digest)?
1746        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1747
1748    let transaction_digests = contents
1749        .iter()
1750        .map(|execution_digests| execution_digests.transaction)
1751        .collect::<Vec<_>>();
1752    let transactions = authority_store
1753        .multi_get_transaction_blocks(&transaction_digests)?
1754        .into_iter()
1755        .map(|maybe_transaction| {
1756            maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1757        })
1758        .collect::<Result<Vec<_>, _>>()?;
1759
1760    let effects = authority_store
1761        .multi_get_executed_effects(&transaction_digests)?
1762        .into_iter()
1763        .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1764        .collect::<Result<Vec<_>, _>>()?;
1765
1766    let events = if load_events {
1767        authority_store
1768            .multi_get_events(&transaction_digests)
1769            .map_err(|e| StorageError::custom(e.to_string()))?
1770    } else {
1771        vec![None; transaction_digests.len()]
1772    };
1773
1774    let mut full_transactions = Vec::with_capacity(transactions.len());
1775    for ((tx, fx), ev) in transactions.into_iter().zip(effects).zip(events) {
1776        let input_objects =
1777            sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1778        let output_objects =
1779            sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1780
1781        let full_transaction = CheckpointTransaction {
1782            transaction: tx.into(),
1783            effects: fx,
1784            events: ev,
1785            input_objects,
1786            output_objects,
1787        };
1788
1789        full_transactions.push(full_transaction);
1790    }
1791
1792    let checkpoint_data = CheckpointData {
1793        checkpoint_summary: summary.into(),
1794        checkpoint_contents: contents,
1795        transactions: full_transactions,
1796    };
1797
1798    Ok(Some(checkpoint_data))
1799}
1800
1801fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1802    match Coin::extract_balance_if_coin(object) {
1803        Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1804        Ok(Some(_)) => {
1805            debug!("Coin object {} has non-struct type tag", object.id());
1806            Ok(None)
1807        }
1808        Ok(None) => {
1809            // Not a coin
1810            Ok(None)
1811        }
1812        Err(e) => {
1813            // Corrupted coin data
1814            Err(StorageError::custom(format!(
1815                "Failed to deserialize coin object {}: {}",
1816                object.id(),
1817                e
1818            )))
1819        }
1820    }
1821}
1822
1823fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
1824    let move_object = object.data.try_as_move()?;
1825
1826    let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
1827    else {
1828        return None;
1829    };
1830
1831    let (key, value): (
1832        sui_types::accumulator_root::AccumulatorKey,
1833        sui_types::accumulator_root::AccumulatorValue,
1834    ) = move_object.try_into().ok()?;
1835
1836    let balance = value.as_u128()? as i128;
1837    if balance <= 0 {
1838        return None;
1839    }
1840
1841    Some((key.owner, *coin_type, balance))
1842}
1843
1844#[cfg(test)]
1845mod tests {
1846    use super::*;
1847    use std::sync::atomic::AtomicU64;
1848    use sui_types::base_types::SuiAddress;
1849
1850    #[tokio::test]
1851    async fn test_events_compaction_filter() {
1852        let temp_dir = tempfile::tempdir().unwrap();
1853        let path = temp_dir.path();
1854        let db_path = path.join("rpc-index");
1855
1856        let pruning_watermark = Arc::new(AtomicU64::new(5));
1857        let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1858
1859        let index_options = IndexStoreOptions {
1860            events_compaction_filter: Some(compaction_filter),
1861        };
1862
1863        let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1864        let stream_id = SuiAddress::random_for_testing_only();
1865        let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1866            .iter()
1867            .map(|&checkpoint_seq| EventIndexKey {
1868                stream_id,
1869                checkpoint_seq,
1870                accumulator_version: 0,
1871                transaction_idx: 0,
1872                event_index: 0,
1873            })
1874            .collect();
1875
1876        let mut batch = tables.events_by_stream.batch();
1877        for key in &test_events {
1878            batch
1879                .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1880                .unwrap();
1881        }
1882        batch.write().unwrap();
1883
1884        tables.events_by_stream.flush().unwrap();
1885        let mut events_before_compaction = 0;
1886        for result in tables.events_by_stream.safe_iter() {
1887            if result.is_ok() {
1888                events_before_compaction += 1;
1889            }
1890        }
1891        assert_eq!(
1892            events_before_compaction, 5,
1893            "Should have 5 events before compaction"
1894        );
1895        let start_key = EventIndexKey {
1896            stream_id: SuiAddress::ZERO,
1897            checkpoint_seq: 0,
1898            accumulator_version: 0,
1899            transaction_idx: 0,
1900            event_index: 0,
1901        };
1902        let end_key = EventIndexKey {
1903            stream_id: SuiAddress::random_for_testing_only(),
1904            checkpoint_seq: u64::MAX,
1905            accumulator_version: u64::MAX,
1906            transaction_idx: u32::MAX,
1907            event_index: u32::MAX,
1908        };
1909
1910        tables
1911            .events_by_stream
1912            .compact_range(&start_key, &end_key)
1913            .unwrap();
1914        let mut events_after_compaction = Vec::new();
1915        for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1916            events_after_compaction.push(key);
1917        }
1918
1919        println!("Events after compaction: {}", events_after_compaction.len());
1920        assert!(
1921            events_after_compaction.len() >= 2,
1922            "Should have at least the events that shouldn't be pruned"
1923        );
1924        pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1925        let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1926        assert_eq!(watermark_after, 20, "Watermark should be updated");
1927    }
1928
1929    #[test]
1930    fn test_events_compaction_filter_logic() {
1931        let watermark = Arc::new(AtomicU64::new(100));
1932        let filter = EventsCompactionFilter::new(watermark.clone());
1933
1934        let old_key = EventIndexKey {
1935            stream_id: SuiAddress::random_for_testing_only(),
1936            checkpoint_seq: 50,
1937            accumulator_version: 0,
1938            transaction_idx: 0,
1939            event_index: 0,
1940        };
1941        let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1942        let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1943        assert!(
1944            matches!(decision, Decision::Remove),
1945            "Event with checkpoint 50 should be removed when watermark is 100"
1946        );
1947        let new_key = EventIndexKey {
1948            stream_id: SuiAddress::random_for_testing_only(),
1949            checkpoint_seq: 150,
1950            accumulator_version: 0,
1951            transaction_idx: 0,
1952            event_index: 0,
1953        };
1954        let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1955        let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1956        assert!(
1957            matches!(decision, Decision::Keep),
1958            "Event with checkpoint 150 should be kept when watermark is 100"
1959        );
1960        let boundary_key = EventIndexKey {
1961            stream_id: SuiAddress::random_for_testing_only(),
1962            checkpoint_seq: 100,
1963            accumulator_version: 0,
1964            transaction_idx: 0,
1965            event_index: 0,
1966        };
1967        let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
1968        let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
1969        assert!(
1970            matches!(decision, Decision::Remove),
1971            "Event with checkpoint equal to watermark should be removed"
1972        );
1973    }
1974}