sui_core/
rpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use rayon::iter::IntoParallelIterator;
12use rayon::iter::ParallelIterator;
13use serde::Deserialize;
14use serde::Serialize;
15use std::collections::{BTreeMap, HashMap};
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::time::Duration;
21use std::time::Instant;
22use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
23use sui_types::base_types::MoveObjectType;
24use sui_types::base_types::ObjectID;
25use sui_types::base_types::SequenceNumber;
26use sui_types::base_types::SuiAddress;
27use sui_types::coin::Coin;
28use sui_types::committee::EpochId;
29use sui_types::digests::TransactionDigest;
30use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
31use sui_types::full_checkpoint_content::CheckpointData;
32use sui_types::layout_resolver::LayoutResolver;
33use sui_types::messages_checkpoint::CheckpointContents;
34use sui_types::messages_checkpoint::CheckpointSequenceNumber;
35use sui_types::object::Data;
36use sui_types::object::Object;
37use sui_types::object::Owner;
38use sui_types::storage::BackingPackageStore;
39use sui_types::storage::DynamicFieldKey;
40use sui_types::storage::EpochInfo;
41use sui_types::storage::TransactionInfo;
42use sui_types::storage::error::Error as StorageError;
43use sui_types::sui_system_state::SuiSystemStateTrait;
44use sui_types::transaction::{TransactionDataAPI, TransactionKind};
45use sysinfo::{MemoryRefreshKind, RefreshKind, System};
46use tracing::{debug, info, warn};
47use typed_store::DBMapUtils;
48use typed_store::TypedStoreError;
49use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
50use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
51use typed_store::traits::Map;
52
53const CURRENT_DB_VERSION: u64 = 3;
54// I tried increasing this to 100k and 1M and it didn't speed up indexing at all.
55const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
56
57fn bulk_ingestion_write_options() -> WriteOptions {
58    let mut opts = WriteOptions::default();
59    opts.disable_wal(true);
60    opts
61}
62
63/// Get available memory, respecting cgroup limits in containerized environments
64fn get_available_memory() -> u64 {
65    // RefreshKind::nothing().with_memory() avoids collecting other, slower stats
66    let mut sys = System::new_with_specifics(
67        RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
68    );
69    sys.refresh_memory();
70
71    // Check if we have cgroup limits
72    if let Some(cgroup_limits) = sys.cgroup_limits() {
73        let memory_limit = cgroup_limits.total_memory;
74        // cgroup_limits.total_memory is 0 when there's no limit
75        if memory_limit > 0 {
76            debug!("Using cgroup memory limit: {} bytes", memory_limit);
77            return memory_limit;
78        }
79    }
80
81    // Fall back to system memory if no cgroup limits found
82    // sysinfo 0.35 already reports bytes (not KiB like older versions)
83    let total_memory_bytes = sys.total_memory();
84    debug!("Using system memory: {} bytes", total_memory_bytes);
85    total_memory_bytes
86}
87
88#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
89struct MetadataInfo {
90    /// Version of the Database
91    version: u64,
92}
93
94/// Checkpoint watermark type
95#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
96pub enum Watermark {
97    Indexed,
98    Pruned,
99}
100
101#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
102pub struct OwnerIndexKey {
103    pub owner: SuiAddress,
104
105    pub object_type: StructTag,
106
107    // If this object is coin-like (eg 0x2::coin::Coin) then this will be the balance of the coin
108    // inverted `!coin.balance` in order to force sorting of coins to be from greatest to least
109    pub inverted_balance: Option<u64>,
110
111    pub object_id: ObjectID,
112}
113
114impl OwnerIndexKey {
115    // Creates a key from the provided object.
116    // Panics if the provided object is not an Address owned object
117    fn from_object(object: &Object) -> Self {
118        let owner = match object.owner() {
119            Owner::AddressOwner(owner) => owner,
120            Owner::ConsensusAddressOwner { owner, .. } => owner,
121            _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
122        };
123        let object_type = object.struct_tag().expect("packages cannot be owned");
124
125        let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
126
127        Self {
128            owner: *owner,
129            object_type,
130            inverted_balance,
131            object_id: object.id(),
132        }
133    }
134}
135
136#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub struct OwnerIndexInfo {
138    // object_id and type of this object are a part of the key
139    pub version: SequenceNumber,
140}
141
142impl OwnerIndexInfo {
143    pub fn new(object: &Object) -> Self {
144        Self {
145            version: object.version(),
146        }
147    }
148}
149
150#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
151pub struct CoinIndexKey {
152    coin_type: StructTag,
153}
154
155#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
156pub struct BalanceKey {
157    pub owner: SuiAddress,
158    pub coin_type: StructTag,
159}
160
161#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
162pub struct CoinIndexInfo {
163    pub coin_metadata_object_id: Option<ObjectID>,
164    pub treasury_object_id: Option<ObjectID>,
165    pub regulated_coin_metadata_object_id: Option<ObjectID>,
166}
167
168#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
169pub struct BalanceIndexInfo {
170    pub balance_delta: i128,
171}
172
173impl From<u64> for BalanceIndexInfo {
174    fn from(coin_value: u64) -> Self {
175        Self {
176            balance_delta: coin_value as i128,
177        }
178    }
179}
180
181impl BalanceIndexInfo {
182    fn invert(self) -> Self {
183        // Check for potential overflow when negating i128::MIN
184        assert!(
185            self.balance_delta != i128::MIN,
186            "Cannot invert balance_delta: would overflow i128"
187        );
188
189        Self {
190            balance_delta: -self.balance_delta,
191        }
192    }
193
194    fn merge_delta(&mut self, other: &Self) {
195        self.balance_delta += other.balance_delta;
196    }
197}
198
199impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
200    fn from(index_info: BalanceIndexInfo) -> Self {
201        // Note: We represent balance deltas as i128 to simplify merging positive and negative updates.
202        // Be aware: Move doesn’t enforce a one-time-witness (OTW) pattern when creating a Supply<T>.
203        // Anyone can call `sui::balance::create_supply` and mint unbounded supply, potentially pushing
204        // total balances over u64::MAX. To avoid crashing the indexer, we clamp the merged value instead
205        // of panicking on overflow. This has the unfortunate consequence of making bugs in the index
206        // harder to detect, but is a necessary trade-off to avoid creating a DOS attack vector.
207        let balance = index_info.balance_delta.clamp(0, u64::MAX as i128) as u64;
208        sui_types::storage::BalanceInfo { balance }
209    }
210}
211
212#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
213pub struct PackageVersionKey {
214    pub original_package_id: ObjectID,
215    pub version: u64,
216}
217
218#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
219pub struct PackageVersionInfo {
220    pub storage_id: ObjectID,
221}
222
223#[derive(Default, Clone)]
224pub struct IndexStoreOptions {
225    pub events_compaction_filter: Option<EventsCompactionFilter>,
226}
227
228fn default_table_options() -> typed_store::rocks::DBOptions {
229    typed_store::rocks::default_db_options().disable_write_throttling()
230}
231
232fn events_table_options(
233    compaction_filter: Option<EventsCompactionFilter>,
234) -> typed_store::rocks::DBOptions {
235    let mut options = default_table_options();
236    if let Some(filter) = compaction_filter {
237        options.options.set_compaction_filter(
238            "events_by_stream",
239            move |_, key, value| match filter.filter(key, value) {
240                Ok(decision) => decision,
241                Err(e) => {
242                    warn!(
243                        "Failed to parse event key during compaction: {}, key: {:?}",
244                        e, key
245                    );
246                    Decision::Remove
247                }
248            },
249        );
250    }
251    options
252}
253
254fn balance_delta_merge_operator(
255    _key: &[u8],
256    existing_val: Option<&[u8]>,
257    operands: &MergeOperands,
258) -> Option<Vec<u8>> {
259    let mut result = existing_val
260        .map(|v| {
261            bcs::from_bytes::<BalanceIndexInfo>(v)
262                .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.")
263        })
264        .unwrap_or_default();
265
266    for operand in operands.iter() {
267        let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
268            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.");
269        result.merge_delta(&delta);
270    }
271    Some(
272        bcs::to_bytes(&result)
273            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
274    )
275}
276
277fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
278    let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
279        Ok(info) => info,
280        Err(_) => return Decision::Keep,
281    };
282
283    if balance_info.balance_delta == 0 {
284        Decision::Remove
285    } else {
286        Decision::Keep
287    }
288}
289
290fn balance_table_options() -> typed_store::rocks::DBOptions {
291    default_table_options()
292        .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
293        .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
294}
295
296impl CoinIndexInfo {
297    fn merge(&mut self, other: Self) {
298        self.coin_metadata_object_id = self
299            .coin_metadata_object_id
300            .or(other.coin_metadata_object_id);
301        self.regulated_coin_metadata_object_id = self
302            .regulated_coin_metadata_object_id
303            .or(other.regulated_coin_metadata_object_id);
304        self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
305    }
306}
307
308/// RocksDB tables for the RpcIndexStore
309///
310/// Anytime a new table is added, or and existing one has it's schema changed, make sure to also
311/// update the value of `CURRENT_DB_VERSION`.
312///
313/// NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
314/// - bounded in size by the live object set
315/// - are prune-able and have corresponding logic in the `prune` function
316#[derive(DBMapUtils)]
317struct IndexStoreTables {
318    /// A singleton that store metadata information on the DB.
319    ///
320    /// A few uses for this singleton:
321    /// - determining if the DB has been initialized (as some tables will still be empty post
322    ///   initialization)
323    /// - version of the DB. Everytime a new table or schema is changed the version number needs to
324    ///   be incremented.
325    meta: DBMap<(), MetadataInfo>,
326
327    /// Table used to track watermark for the highest indexed checkpoint
328    ///
329    /// This is useful to help know the highest checkpoint that was indexed in the event that the
330    /// node was running with indexes enabled, then run for a period of time with indexes disabled,
331    /// and then run with them enabled again so that the tables can be reinitialized.
332    #[default_options_override_fn = "default_table_options"]
333    watermark: DBMap<Watermark, CheckpointSequenceNumber>,
334
335    /// An index of extra metadata for Epochs.
336    ///
337    /// Only contains entries for transactions which have yet to be pruned from the main database.
338    #[default_options_override_fn = "default_table_options"]
339    epochs: DBMap<EpochId, EpochInfo>,
340
341    /// An index of extra metadata for Transactions.
342    ///
343    /// Only contains entries for transactions which have yet to be pruned from the main database.
344    #[default_options_override_fn = "default_table_options"]
345    transactions: DBMap<TransactionDigest, TransactionInfo>,
346
347    /// An index of object ownership.
348    ///
349    /// Allows an efficient iterator to list all objects currently owned by a specific user
350    /// account.
351    #[default_options_override_fn = "default_table_options"]
352    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
353
354    /// An index of dynamic fields (children objects).
355    ///
356    /// Allows an efficient iterator to list all of the dynamic fields owned by a particular
357    /// ObjectID.
358    #[default_options_override_fn = "default_table_options"]
359    dynamic_field: DBMap<DynamicFieldKey, ()>,
360
361    /// An index of Coin Types
362    ///
363    /// Allows looking up information related to published Coins, like the ObjectID of its
364    /// coorisponding CoinMetadata.
365    #[default_options_override_fn = "default_table_options"]
366    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
367
368    /// An index of Balances.
369    ///
370    /// Allows looking up balances by owner address and coin type.
371    #[default_options_override_fn = "balance_table_options"]
372    balance: DBMap<BalanceKey, BalanceIndexInfo>,
373
374    /// An index of Package versions.
375    ///
376    /// Maps original package ID and version to the storage ID of that version.
377    /// Allows efficient listing of all versions of a package.
378    #[default_options_override_fn = "default_table_options"]
379    package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
380
381    /// Authenticated events index by (stream_id, checkpoint_seq, transaction_idx, event_index)
382    events_by_stream: DBMap<EventIndexKey, ()>,
383    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
384    // - bounded in size by the live object set
385    // - are prune-able and have corresponding logic in the `prune` function
386}
387
388#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
389pub struct EventIndexKey {
390    pub stream_id: SuiAddress,
391    pub checkpoint_seq: u64,
392    /// The accumulator version that this event is settled into
393    pub accumulator_version: u64,
394    pub transaction_idx: u32,
395    pub event_index: u32,
396}
397
398/// Compaction filter for automatic pruning of old authenticated events during RocksDB compaction.
399#[derive(Clone)]
400pub struct EventsCompactionFilter {
401    pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
402}
403
404impl EventsCompactionFilter {
405    pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
406        Self { pruning_watermark }
407    }
408
409    pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
410        let event_key: EventIndexKey = bcs::from_bytes(key)?;
411        let watermark = self
412            .pruning_watermark
413            .load(std::sync::atomic::Ordering::Relaxed);
414
415        if event_key.checkpoint_seq <= watermark {
416            Ok(Decision::Remove)
417        } else {
418            Ok(Decision::Keep)
419        }
420    }
421}
422
423impl IndexStoreTables {
424    fn track_coin_balance_change(
425        object: &Object,
426        owner: &SuiAddress,
427        is_removal: bool,
428        balance_changes: &mut HashMap<BalanceKey, BalanceIndexInfo>,
429    ) -> Result<(), StorageError> {
430        if let Some((struct_tag, value)) = get_balance_and_type_if_coin(object)? {
431            let key = BalanceKey {
432                owner: *owner,
433                coin_type: struct_tag,
434            };
435
436            let mut delta = BalanceIndexInfo::from(value);
437            if is_removal {
438                delta = delta.invert();
439            }
440
441            balance_changes.entry(key).or_default().merge_delta(&delta);
442        }
443        Ok(())
444    }
445
446    fn extract_version_if_package(
447        object: &Object,
448    ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
449        if let Data::Package(package) = &object.data {
450            let original_id = package.original_package_id();
451            let version = package.version().value();
452            let storage_id = object.id();
453
454            let key = PackageVersionKey {
455                original_package_id: original_id,
456                version,
457            };
458            let info = PackageVersionInfo { storage_id };
459            return Some((key, info));
460        }
461        None
462    }
463
464    fn open_with_index_options<P: Into<PathBuf>>(
465        path: P,
466        index_options: IndexStoreOptions,
467    ) -> Self {
468        let mut table_options = std::collections::BTreeMap::new();
469        table_options.insert("balance".to_string(), balance_table_options());
470        table_options.insert(
471            "events_by_stream".to_string(),
472            events_table_options(index_options.events_compaction_filter),
473        );
474
475        IndexStoreTables::open_tables_read_write(
476            path.into(),
477            MetricConf::new("rpc-index"),
478            None,
479            Some(DBMapTableConfigMap::new(table_options)),
480        )
481    }
482
483    fn open_with_options<P: Into<PathBuf>>(
484        path: P,
485        options: typed_store::rocksdb::Options,
486        table_options: Option<DBMapTableConfigMap>,
487    ) -> Self {
488        IndexStoreTables::open_tables_read_write(
489            path.into(),
490            MetricConf::new("rpc-index"),
491            Some(options),
492            table_options,
493        )
494    }
495
496    fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
497        (match self.meta.get(&()) {
498            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
499            Ok(None) => true,
500            Err(_) => true,
501        }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
502    }
503
504    // Check if the index watermark is behind the highets_executed watermark.
505    fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
506        let highest_executed_checkpint = checkpoint_store
507            .get_highest_executed_checkpoint_seq_number()
508            .ok()
509            .flatten();
510        let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
511        watermark < highest_executed_checkpint
512    }
513
514    #[tracing::instrument(skip_all)]
515    fn init(
516        &mut self,
517        authority_store: &AuthorityStore,
518        checkpoint_store: &CheckpointStore,
519        _epoch_store: &AuthorityPerEpochStore,
520        _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
521        batch_size_limit: usize,
522        rpc_config: &sui_config::RpcConfig,
523    ) -> Result<(), StorageError> {
524        info!("Initializing RPC indexes");
525
526        let highest_executed_checkpint =
527            checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
528        let lowest_available_checkpoint = checkpoint_store
529            .get_highest_pruned_checkpoint_seq_number()?
530            .map(|c| c.saturating_add(1))
531            .unwrap_or(0);
532        let lowest_available_checkpoint_objects = authority_store
533            .perpetual_tables
534            .get_highest_pruned_checkpoint()?
535            .map(|c| c.saturating_add(1))
536            .unwrap_or(0);
537        // Doing backfill requires processing objects so we have to restrict our backfill range
538        // to the range of checkpoints that we have objects for.
539        let lowest_available_checkpoint =
540            lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
541
542        let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
543            lowest_available_checkpoint..=highest_executed_checkpint
544        });
545
546        if let Some(checkpoint_range) = checkpoint_range {
547            self.index_existing_transactions(
548                authority_store,
549                checkpoint_store,
550                checkpoint_range,
551                rpc_config,
552            )?;
553        }
554
555        self.initialize_current_epoch(authority_store, checkpoint_store)?;
556
557        // Only index live objects if genesis checkpoint has been executed.
558        // If genesis hasn't been executed yet, the objects will be properly indexed
559        // as checkpoints are processed through the normal checkpoint execution path.
560        if highest_executed_checkpint.is_some() {
561            let coin_index = Mutex::new(HashMap::new());
562
563            let make_live_object_indexer = RpcParLiveObjectSetIndexer {
564                tables: self,
565                coin_index: &coin_index,
566                batch_size_limit,
567            };
568
569            crate::par_index_live_object_set::par_index_live_object_set(
570                authority_store,
571                &make_live_object_indexer,
572            )?;
573
574            self.coin.multi_insert(coin_index.into_inner().unwrap())?;
575        }
576
577        self.watermark.insert(
578            &Watermark::Indexed,
579            &highest_executed_checkpint.unwrap_or(0),
580        )?;
581
582        self.meta.insert(
583            &(),
584            &MetadataInfo {
585                version: CURRENT_DB_VERSION,
586            },
587        )?;
588
589        info!("Finished initializing RPC indexes");
590
591        Ok(())
592    }
593
594    #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
595    fn index_existing_transactions(
596        &mut self,
597        authority_store: &AuthorityStore,
598        checkpoint_store: &CheckpointStore,
599        checkpoint_range: std::ops::RangeInclusive<u64>,
600        rpc_config: &sui_config::RpcConfig,
601    ) -> Result<(), StorageError> {
602        info!(
603            "Indexing {} checkpoints in range {checkpoint_range:?}",
604            checkpoint_range.size_hint().0
605        );
606        let start_time = Instant::now();
607
608        checkpoint_range.into_par_iter().try_for_each(|seq| {
609            let load_events = rpc_config.authenticated_events_indexing();
610            let checkpoint_data = sparse_checkpoint_data_for_backfill(
611                authority_store,
612                checkpoint_store,
613                seq,
614                load_events,
615            )?;
616
617            let mut batch = self.transactions.batch();
618
619            self.index_epoch(&checkpoint_data, &mut batch)?;
620            self.index_transactions(&checkpoint_data, &mut batch, load_events)?;
621
622            batch
623                .write_opt(&(bulk_ingestion_write_options()))
624                .map_err(StorageError::from)
625        })?;
626
627        info!(
628            "Indexing checkpoints took {} seconds",
629            start_time.elapsed().as_secs()
630        );
631        Ok(())
632    }
633
634    /// Prune data from this Index
635    fn prune(
636        &self,
637        pruned_checkpoint_watermark: u64,
638        checkpoint_contents_to_prune: &[CheckpointContents],
639    ) -> Result<(), TypedStoreError> {
640        let mut batch = self.transactions.batch();
641
642        let transactions_to_prune = checkpoint_contents_to_prune
643            .iter()
644            .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
645
646        batch.delete_batch(&self.transactions, transactions_to_prune)?;
647        batch.insert_batch(
648            &self.watermark,
649            [(Watermark::Pruned, pruned_checkpoint_watermark)],
650        )?;
651
652        batch.write()
653    }
654
655    /// Index a Checkpoint
656    fn index_checkpoint(
657        &self,
658        checkpoint: &CheckpointData,
659        _resolver: &mut dyn LayoutResolver,
660        rpc_config: &sui_config::RpcConfig,
661    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
662        debug!(
663            checkpoint = checkpoint.checkpoint_summary.sequence_number,
664            "indexing checkpoint"
665        );
666
667        let mut batch = self.transactions.batch();
668
669        self.index_epoch(checkpoint, &mut batch)?;
670        self.index_transactions(
671            checkpoint,
672            &mut batch,
673            rpc_config.authenticated_events_indexing(),
674        )?;
675        self.index_objects(checkpoint, &mut batch)?;
676
677        batch.insert_batch(
678            &self.watermark,
679            [(
680                Watermark::Indexed,
681                checkpoint.checkpoint_summary.sequence_number,
682            )],
683        )?;
684
685        debug!(
686            checkpoint = checkpoint.checkpoint_summary.sequence_number,
687            "finished indexing checkpoint"
688        );
689
690        Ok(batch)
691    }
692
693    fn extract_accumulator_version(
694        &self,
695        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
696    ) -> Option<u64> {
697        let TransactionKind::ProgrammableSystemTransaction(pt) =
698            tx.transaction.transaction_data().kind()
699        else {
700            return None;
701        };
702
703        if pt.shared_input_objects().any(|obj| {
704            obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
705                && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
706        }) {
707            return tx.output_objects.iter().find_map(|obj| {
708                if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
709                    Some(obj.version().value())
710                } else {
711                    None
712                }
713            });
714        }
715
716        None
717    }
718
719    fn index_transaction_events(
720        &self,
721        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
722        checkpoint_seq: u64,
723        tx_idx: u32,
724        accumulator_version: Option<u64>,
725        batch: &mut typed_store::rocks::DBBatch,
726    ) -> Result<(), StorageError> {
727        let acc_events = tx.effects.accumulator_events();
728        if acc_events.is_empty() {
729            return Ok(());
730        }
731
732        let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
733        for acc in acc_events {
734            if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
735                let Some(accumulator_version) = accumulator_version else {
736                    mysten_common::debug_fatal!(
737                        "Found events at checkpoint {} tx {} before any accumulator settlement",
738                        checkpoint_seq,
739                        tx_idx
740                    );
741                    continue;
742                };
743
744                if let Some(stream_id) =
745                    sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
746                {
747                    for (idx, _d) in event_digests {
748                        let key = EventIndexKey {
749                            stream_id,
750                            checkpoint_seq,
751                            accumulator_version,
752                            transaction_idx: tx_idx,
753                            event_index: *idx as u32,
754                        };
755                        entries.push((key, ()));
756                    }
757                }
758            }
759        }
760
761        if !entries.is_empty() {
762            batch.insert_batch(&self.events_by_stream, entries)?;
763        }
764        Ok(())
765    }
766
767    fn index_epoch(
768        &self,
769        checkpoint: &CheckpointData,
770        batch: &mut typed_store::rocks::DBBatch,
771    ) -> Result<(), StorageError> {
772        let Some(epoch_info) = checkpoint.epoch_info()? else {
773            return Ok(());
774        };
775        if epoch_info.epoch > 0 {
776            let prev_epoch = epoch_info.epoch - 1;
777            let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
778            current_epoch.epoch = prev_epoch; // set this incase there wasn't an entry
779            current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
780            current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
781            batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
782        }
783        batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
784        Ok(())
785    }
786
787    // After attempting to reindex past epochs, ensure that the current epoch is at least partially
788    // initalized
789    fn initialize_current_epoch(
790        &mut self,
791        authority_store: &AuthorityStore,
792        checkpoint_store: &CheckpointStore,
793    ) -> Result<(), StorageError> {
794        let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
795            return Ok(());
796        };
797
798        let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
799            .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
800
801        let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
802        epoch.epoch = checkpoint.epoch;
803
804        if epoch.protocol_version.is_none() {
805            epoch.protocol_version = Some(system_state.protocol_version());
806        }
807
808        if epoch.start_timestamp_ms.is_none() {
809            epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
810        }
811
812        if epoch.reference_gas_price.is_none() {
813            epoch.reference_gas_price = Some(system_state.reference_gas_price());
814        }
815
816        if epoch.system_state.is_none() {
817            epoch.system_state = Some(system_state);
818        }
819
820        self.epochs.insert(&epoch.epoch, &epoch)?;
821
822        Ok(())
823    }
824
825    fn index_transactions(
826        &self,
827        checkpoint: &CheckpointData,
828        batch: &mut typed_store::rocks::DBBatch,
829        index_events: bool,
830    ) -> Result<(), StorageError> {
831        let cp = checkpoint.checkpoint_summary.sequence_number;
832        let mut current_accumulator_version: Option<u64> = None;
833
834        // iterate in reverse order, process accumulator settlements first
835        for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
836            let info = TransactionInfo::new(
837                tx.transaction.transaction_data(),
838                &tx.effects,
839                &tx.input_objects,
840                &tx.output_objects,
841                cp,
842            );
843
844            let digest = tx.transaction.digest();
845            batch.insert_batch(&self.transactions, [(digest, info)])?;
846
847            if index_events {
848                if let Some(version) = self.extract_accumulator_version(tx) {
849                    current_accumulator_version = Some(version);
850                }
851
852                self.index_transaction_events(
853                    tx,
854                    cp,
855                    tx_idx as u32,
856                    current_accumulator_version,
857                    batch,
858                )?;
859            }
860        }
861
862        Ok(())
863    }
864
865    fn index_objects(
866        &self,
867        checkpoint: &CheckpointData,
868        batch: &mut typed_store::rocks::DBBatch,
869    ) -> Result<(), StorageError> {
870        let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
871        let mut balance_changes: HashMap<BalanceKey, BalanceIndexInfo> = HashMap::new();
872        let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
873
874        for tx in &checkpoint.transactions {
875            // determine changes from removed objects
876            for removed_object in tx.removed_objects_pre_version() {
877                match removed_object.owner() {
878                    Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
879                        Self::track_coin_balance_change(
880                            removed_object,
881                            owner,
882                            true,
883                            &mut balance_changes,
884                        )?;
885
886                        let owner_key = OwnerIndexKey::from_object(removed_object);
887                        batch.delete_batch(&self.owner, [owner_key])?;
888                    }
889                    Owner::ObjectOwner(object_id) => {
890                        batch.delete_batch(
891                            &self.dynamic_field,
892                            [DynamicFieldKey::new(*object_id, removed_object.id())],
893                        )?;
894                    }
895                    Owner::Shared { .. } | Owner::Immutable => {}
896                }
897            }
898
899            // determine changes from changed objects
900            for (object, old_object) in tx.changed_objects() {
901                if let Some(old_object) = old_object {
902                    match old_object.owner() {
903                        Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
904                            Self::track_coin_balance_change(
905                                old_object,
906                                owner,
907                                true,
908                                &mut balance_changes,
909                            )?;
910
911                            let owner_key = OwnerIndexKey::from_object(old_object);
912                            batch.delete_batch(&self.owner, [owner_key])?;
913                        }
914
915                        Owner::ObjectOwner(object_id) => {
916                            if old_object.owner() != object.owner() {
917                                batch.delete_batch(
918                                    &self.dynamic_field,
919                                    [DynamicFieldKey::new(*object_id, old_object.id())],
920                                )?;
921                            }
922                        }
923
924                        Owner::Shared { .. } | Owner::Immutable => {}
925                    }
926                }
927
928                match object.owner() {
929                    Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
930                        Self::track_coin_balance_change(
931                            object,
932                            owner,
933                            false,
934                            &mut balance_changes,
935                        )?;
936                        let owner_key = OwnerIndexKey::from_object(object);
937                        let owner_info = OwnerIndexInfo::new(object);
938                        batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
939                    }
940                    Owner::ObjectOwner(parent) => {
941                        if should_index_dynamic_field(object) {
942                            let field_key = DynamicFieldKey::new(*parent, object.id());
943                            batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
944                        }
945                    }
946                    Owner::Shared { .. } | Owner::Immutable => {}
947                }
948                if let Some((key, info)) = Self::extract_version_if_package(object) {
949                    package_version_index.push((key, info));
950                }
951            }
952
953            // coin indexing
954            //
955            // coin indexing relies on the fact that CoinMetadata and TreasuryCap are created in
956            // the same transaction so we don't need to worry about overriding any older value
957            // that may exist in the database (because there necessarily cannot be).
958            for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
959                use std::collections::hash_map::Entry;
960
961                match coin_index.entry(key) {
962                    Entry::Occupied(mut o) => {
963                        o.get_mut().merge(value);
964                    }
965                    Entry::Vacant(v) => {
966                        v.insert(value);
967                    }
968                }
969            }
970        }
971
972        batch.insert_batch(&self.coin, coin_index)?;
973        batch.partial_merge_batch(&self.balance, balance_changes)?;
974        batch.insert_batch(&self.package_version, package_version_index)?;
975
976        Ok(())
977    }
978
979    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
980        self.epochs.get(&epoch)
981    }
982
983    fn get_transaction_info(
984        &self,
985        digest: &TransactionDigest,
986    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
987        self.transactions.get(digest)
988    }
989
990    fn event_iter(
991        &self,
992        stream_id: SuiAddress,
993        start_checkpoint: u64,
994        start_accumulator_version: u64,
995        start_transaction_idx: u32,
996        start_event_idx: u32,
997        end_checkpoint: u64,
998        limit: u32,
999    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1000    {
1001        let lower = EventIndexKey {
1002            stream_id,
1003            checkpoint_seq: start_checkpoint,
1004            accumulator_version: start_accumulator_version,
1005            transaction_idx: start_transaction_idx,
1006            event_index: start_event_idx,
1007        };
1008        let upper = EventIndexKey {
1009            stream_id,
1010            checkpoint_seq: end_checkpoint,
1011            accumulator_version: u64::MAX,
1012            transaction_idx: u32::MAX,
1013            event_index: u32::MAX,
1014        };
1015
1016        Ok(self
1017            .events_by_stream
1018            .safe_iter_with_bounds(Some(lower), Some(upper))
1019            .map(|res| res.map(|(k, _)| k))
1020            .take(limit as usize))
1021    }
1022
1023    fn owner_iter(
1024        &self,
1025        owner: SuiAddress,
1026        object_type: Option<StructTag>,
1027        cursor: Option<OwnerIndexKey>,
1028    ) -> Result<
1029        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1030        TypedStoreError,
1031    > {
1032        // TODO can we figure out how to pass a raw byte array as a cursor?
1033        let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1034            owner,
1035            object_type: object_type
1036                .clone()
1037                .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1038            inverted_balance: None,
1039            object_id: ObjectID::ZERO,
1040        });
1041
1042        Ok(self
1043            .owner
1044            .safe_iter_with_bounds(Some(lower_bound), None)
1045            .take_while(move |item| {
1046                // If there's an error let if flow through
1047                let Ok((key, _)) = item else {
1048                    return true;
1049                };
1050
1051                // Only take if owner matches
1052                key.owner == owner
1053                    // and if an object type was supplied that the type matches
1054                    && object_type
1055                        .as_ref()
1056                        .map(|ty| {
1057                            ty.address == key.object_type.address
1058                                && ty.module == key.object_type.module
1059                                && ty.name == key.object_type.name
1060                                // If type_params are not provided then we match all params
1061                                && (ty.type_params.is_empty() ||
1062                                    // If they are provided the type params must match
1063                                    ty.type_params == key.object_type.type_params)
1064                        }).unwrap_or(true)
1065            }))
1066    }
1067
1068    fn dynamic_field_iter(
1069        &self,
1070        parent: ObjectID,
1071        cursor: Option<ObjectID>,
1072    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1073    {
1074        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1075        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1076        let iter = self
1077            .dynamic_field
1078            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1079            .map_ok(|(key, ())| key);
1080        Ok(iter)
1081    }
1082
1083    fn get_coin_info(
1084        &self,
1085        coin_type: &StructTag,
1086    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1087        let key = CoinIndexKey {
1088            coin_type: coin_type.to_owned(),
1089        };
1090        self.coin.get(&key)
1091    }
1092
1093    fn get_balance(
1094        &self,
1095        owner: &SuiAddress,
1096        coin_type: &StructTag,
1097    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1098        let key = BalanceKey {
1099            owner: owner.to_owned(),
1100            coin_type: coin_type.to_owned(),
1101        };
1102        self.balance.get(&key)
1103    }
1104
1105    fn balance_iter(
1106        &self,
1107        owner: SuiAddress,
1108        cursor: Option<BalanceKey>,
1109    ) -> Result<
1110        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1111        TypedStoreError,
1112    > {
1113        let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1114            owner,
1115            coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1116        });
1117
1118        Ok(self
1119            .balance
1120            .safe_iter_with_bounds(Some(lower_bound), None)
1121            .scan((), move |_, item| {
1122                match item {
1123                    Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1124                    Ok(_) => None,          // Different owner, stop iteration
1125                    Err(e) => Some(Err(e)), // Propagate error
1126                }
1127            }))
1128    }
1129
1130    fn package_versions_iter(
1131        &self,
1132        original_id: ObjectID,
1133        cursor: Option<u64>,
1134    ) -> Result<
1135        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1136        TypedStoreError,
1137    > {
1138        let lower_bound = PackageVersionKey {
1139            original_package_id: original_id,
1140            version: cursor.unwrap_or(0),
1141        };
1142        let upper_bound = PackageVersionKey {
1143            original_package_id: original_id,
1144            version: u64::MAX,
1145        };
1146
1147        Ok(self
1148            .package_version
1149            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1150    }
1151}
1152
1153pub struct RpcIndexStore {
1154    tables: IndexStoreTables,
1155    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1156    rpc_config: sui_config::RpcConfig,
1157}
1158
1159impl RpcIndexStore {
1160    /// Given the provided directory, construct the path to the db
1161    fn db_path(dir: &Path) -> PathBuf {
1162        dir.join("rpc-index")
1163    }
1164
1165    pub async fn new(
1166        dir: &Path,
1167        authority_store: &AuthorityStore,
1168        checkpoint_store: &CheckpointStore,
1169        epoch_store: &AuthorityPerEpochStore,
1170        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1171        pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1172        rpc_config: sui_config::RpcConfig,
1173    ) -> Self {
1174        let events_filter = EventsCompactionFilter::new(pruning_watermark);
1175        let index_options = IndexStoreOptions {
1176            events_compaction_filter: Some(events_filter),
1177        };
1178
1179        Self::new_with_options(
1180            dir,
1181            authority_store,
1182            checkpoint_store,
1183            epoch_store,
1184            package_store,
1185            index_options,
1186            rpc_config,
1187        )
1188        .await
1189    }
1190
1191    pub async fn new_with_options(
1192        dir: &Path,
1193        authority_store: &AuthorityStore,
1194        checkpoint_store: &CheckpointStore,
1195        epoch_store: &AuthorityPerEpochStore,
1196        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1197        index_options: IndexStoreOptions,
1198        rpc_config: sui_config::RpcConfig,
1199    ) -> Self {
1200        let path = Self::db_path(dir);
1201        let index_config = rpc_config.index_initialization_config();
1202
1203        let tables = {
1204            let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1205
1206            // If the index tables are uninitialized or on an older version then we need to
1207            // populate them
1208            if tables.needs_to_do_initialization(checkpoint_store) {
1209                let batch_size_limit;
1210
1211                let mut tables = {
1212                    drop(tables);
1213                    typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1214                        .await
1215                        .expect("unable to destroy old rpc-index db");
1216
1217                    // Open the empty DB with `unordered_write`s enabled in order to get a ~3x
1218                    // speedup when indexing
1219                    let mut options = typed_store::rocksdb::Options::default();
1220                    options.set_unordered_write(true);
1221
1222                    // Allow CPU-intensive flushing operations to use all CPUs.
1223                    let max_background_jobs = if let Some(jobs) =
1224                        index_config.as_ref().and_then(|c| c.max_background_jobs)
1225                    {
1226                        debug!("Using config override for max_background_jobs: {}", jobs);
1227                        jobs
1228                    } else {
1229                        let jobs = num_cpus::get() as i32;
1230                        debug!(
1231                            "Calculated max_background_jobs: {} (based on CPU count)",
1232                            jobs
1233                        );
1234                        jobs
1235                    };
1236                    options.set_max_background_jobs(max_background_jobs);
1237
1238                    // We are disabling compaction for all column families below. This means we can
1239                    // also disable the backpressure that slows down writes when the number of L0
1240                    // files builds up since we will never compact them anyway.
1241                    options.set_level_zero_file_num_compaction_trigger(0);
1242                    options.set_level_zero_slowdown_writes_trigger(-1);
1243                    options.set_level_zero_stop_writes_trigger(i32::MAX);
1244
1245                    let total_memory_bytes = get_available_memory();
1246                    // This is an upper bound on the amount to of ram the memtables can use across
1247                    // all column families.
1248                    let db_buffer_size = if let Some(size) =
1249                        index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1250                    {
1251                        debug!(
1252                            "Using config override for db_write_buffer_size: {} bytes",
1253                            size
1254                        );
1255                        size
1256                    } else {
1257                        // Default to 80% of system RAM
1258                        let size = (total_memory_bytes as f64 * 0.8) as usize;
1259                        debug!(
1260                            "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1261                            size, total_memory_bytes
1262                        );
1263                        size
1264                    };
1265                    options.set_db_write_buffer_size(db_buffer_size);
1266
1267                    // Create column family specific options.
1268                    let mut table_config_map = BTreeMap::new();
1269
1270                    // Create options with compactions disabled and large write buffers.
1271                    // Each CF can use up to 25% of system RAM, but total is still limited by
1272                    // set_db_write_buffer_size configured above.
1273                    let mut cf_options = typed_store::rocks::default_db_options();
1274                    cf_options.options.set_disable_auto_compactions(true);
1275
1276                    let (buffer_size, buffer_count) = match (
1277                        index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1278                        index_config
1279                            .as_ref()
1280                            .and_then(|c| c.cf_max_write_buffer_number),
1281                    ) {
1282                        (Some(size), Some(count)) => {
1283                            debug!(
1284                                "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1285                                size, count
1286                            );
1287                            (size, count)
1288                        }
1289                        (None, None) => {
1290                            // Calculate buffer configuration: 25% of RAM split across buffers
1291                            let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1292                            debug!(
1293                                "Column family memory budget: {} bytes (25% of {} total bytes)",
1294                                cf_memory_budget, total_memory_bytes
1295                            );
1296                            const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB minimum
1297
1298                            // Target number of buffers based on CPU count
1299                            // More CPUs = more parallel flushing capability
1300                            let target_buffer_count = num_cpus::get().max(2);
1301
1302                            // Aim for CPU-based buffer count, but reduce if it would make buffers too small
1303                            //   For example:
1304                            // - 128GB RAM, 32 CPUs: 32GB per CF / 32 buffers = 1GB each
1305                            // - 16GB RAM, 8 CPUs: 4GB per CF / 8 buffers = 512MB each
1306                            // - 4GB RAM, 8 CPUs: 1GB per CF / 64MB min = ~16 buffers of 64MB each
1307                            let buffer_size =
1308                                (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1309                            let buffer_count = (cf_memory_budget / buffer_size)
1310                                .clamp(2, target_buffer_count)
1311                                as i32;
1312                            debug!(
1313                                "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1314                                buffer_size, buffer_count, target_buffer_count
1315                            );
1316                            (buffer_size, buffer_count)
1317                        }
1318                        _ => {
1319                            panic!(
1320                                "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1321                            );
1322                        }
1323                    };
1324
1325                    cf_options.options.set_write_buffer_size(buffer_size);
1326                    cf_options.options.set_max_write_buffer_number(buffer_count);
1327
1328                    // Calculate batch size limit: default to half the buffer size or 128MB, whichever is smaller
1329                    batch_size_limit = if let Some(limit) =
1330                        index_config.as_ref().and_then(|c| c.batch_size_limit)
1331                    {
1332                        debug!(
1333                            "Using config override for batch_size_limit: {} bytes",
1334                            limit
1335                        );
1336                        limit
1337                    } else {
1338                        let half_buffer = buffer_size / 2;
1339                        let default_limit = 1 << 27; // 128MB
1340                        let limit = half_buffer.min(default_limit);
1341                        debug!(
1342                            "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1343                            limit, half_buffer, default_limit
1344                        );
1345                        limit
1346                    };
1347
1348                    // Apply cf_options to all tables
1349                    for (table_name, _) in IndexStoreTables::describe_tables() {
1350                        table_config_map.insert(table_name, cf_options.clone());
1351                    }
1352
1353                    // Override Balance options with the merge operator
1354                    let mut balance_options = cf_options.clone();
1355                    balance_options = balance_options.set_merge_operator_associative(
1356                        "balance_merge",
1357                        balance_delta_merge_operator,
1358                    );
1359                    table_config_map.insert("balance".to_string(), balance_options);
1360
1361                    table_config_map.insert(
1362                        "events_by_stream".to_string(),
1363                        events_table_options(index_options.events_compaction_filter.clone()),
1364                    );
1365
1366                    IndexStoreTables::open_with_options(
1367                        &path,
1368                        options,
1369                        Some(DBMapTableConfigMap::new(table_config_map)),
1370                    )
1371                };
1372
1373                tables
1374                    .init(
1375                        authority_store,
1376                        checkpoint_store,
1377                        epoch_store,
1378                        package_store,
1379                        batch_size_limit,
1380                        &rpc_config,
1381                    )
1382                    .expect("unable to initialize rpc index from live object set");
1383
1384                // Flush all data to disk before dropping tables.
1385                // This is critical because WAL is disabled during bulk indexing.
1386                // Note we only need to call flush on one table because all tables share the same
1387                // underlying database.
1388                tables
1389                    .meta
1390                    .flush()
1391                    .expect("Failed to flush RPC index tables to disk");
1392
1393                let weak_db = Arc::downgrade(&tables.meta.db);
1394                drop(tables);
1395
1396                let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1397                loop {
1398                    if weak_db.strong_count() == 0 {
1399                        break;
1400                    }
1401                    if std::time::Instant::now() > deadline {
1402                        panic!("unable to reopen DB after indexing");
1403                    }
1404                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1405                }
1406
1407                // Reopen the DB with default options (eg without `unordered_write`s enabled)
1408                let reopened_tables =
1409                    IndexStoreTables::open_with_index_options(&path, index_options);
1410
1411                // Sanity check: verify the database version was persisted correctly
1412                let stored_version = reopened_tables
1413                    .meta
1414                    .get(&())
1415                    .expect("Failed to read metadata from reopened database")
1416                    .expect("Metadata not found in reopened database");
1417                assert_eq!(
1418                    stored_version.version, CURRENT_DB_VERSION,
1419                    "Database version mismatch after flush and reopen: expected {}, found {}",
1420                    CURRENT_DB_VERSION, stored_version.version
1421                );
1422
1423                reopened_tables
1424            } else {
1425                tables
1426            }
1427        };
1428
1429        Self {
1430            tables,
1431            pending_updates: Default::default(),
1432            rpc_config,
1433        }
1434    }
1435
1436    pub fn new_without_init(dir: &Path) -> Self {
1437        let path = Self::db_path(dir);
1438        let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1439
1440        Self {
1441            tables,
1442            pending_updates: Default::default(),
1443            rpc_config: sui_config::RpcConfig::default(),
1444        }
1445    }
1446
1447    pub fn prune(
1448        &self,
1449        pruned_checkpoint_watermark: u64,
1450        checkpoint_contents_to_prune: &[CheckpointContents],
1451    ) -> Result<(), TypedStoreError> {
1452        self.tables
1453            .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1454    }
1455
1456    /// Index a checkpoint and stage the index updated in `pending_updates`.
1457    ///
1458    /// Updates will not be committed to the database until `commit_update_for_checkpoint` is
1459    /// called.
1460    #[tracing::instrument(
1461        skip_all,
1462        fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1463    )]
1464    pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1465        let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1466        let batch = self
1467            .tables
1468            .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1469            .expect("db error");
1470
1471        self.pending_updates
1472            .lock()
1473            .unwrap()
1474            .insert(sequence_number, batch);
1475    }
1476
1477    /// Commits the pending updates for the provided checkpoint number.
1478    ///
1479    /// Invariants:
1480    /// - `index_checkpoint` must have been called for the provided checkpoint
1481    /// - Callers of this function must ensure that it is called for each checkpoint in sequential
1482    ///   order. This will panic if the provided checkpoint does not match the expected next
1483    ///   checkpoint to commit.
1484    #[tracing::instrument(skip(self))]
1485    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1486        let next_batch = self.pending_updates.lock().unwrap().pop_first();
1487
1488        // Its expected that the next batch exists
1489        let (next_sequence_number, batch) = next_batch.unwrap();
1490        assert_eq!(
1491            checkpoint, next_sequence_number,
1492            "commit_update_for_checkpoint must be called in order"
1493        );
1494
1495        Ok(batch.write()?)
1496    }
1497
1498    pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1499        self.tables.get_epoch_info(epoch)
1500    }
1501
1502    pub fn get_transaction_info(
1503        &self,
1504        digest: &TransactionDigest,
1505    ) -> Result<Option<TransactionInfo>, TypedStoreError> {
1506        self.tables.get_transaction_info(digest)
1507    }
1508
1509    pub fn owner_iter(
1510        &self,
1511        owner: SuiAddress,
1512        object_type: Option<StructTag>,
1513        cursor: Option<OwnerIndexKey>,
1514    ) -> Result<
1515        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1516        TypedStoreError,
1517    > {
1518        self.tables.owner_iter(owner, object_type, cursor)
1519    }
1520
1521    pub fn dynamic_field_iter(
1522        &self,
1523        parent: ObjectID,
1524        cursor: Option<ObjectID>,
1525    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1526    {
1527        self.tables.dynamic_field_iter(parent, cursor)
1528    }
1529
1530    pub fn get_coin_info(
1531        &self,
1532        coin_type: &StructTag,
1533    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1534        self.tables.get_coin_info(coin_type)
1535    }
1536
1537    pub fn get_balance(
1538        &self,
1539        owner: &SuiAddress,
1540        coin_type: &StructTag,
1541    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1542        self.tables.get_balance(owner, coin_type)
1543    }
1544
1545    pub fn balance_iter(
1546        &self,
1547        owner: SuiAddress,
1548        cursor: Option<BalanceKey>,
1549    ) -> Result<
1550        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1551        TypedStoreError,
1552    > {
1553        self.tables.balance_iter(owner, cursor)
1554    }
1555
1556    pub fn package_versions_iter(
1557        &self,
1558        original_id: ObjectID,
1559        cursor: Option<u64>,
1560    ) -> Result<
1561        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1562        TypedStoreError,
1563    > {
1564        self.tables.package_versions_iter(original_id, cursor)
1565    }
1566
1567    pub fn event_iter(
1568        &self,
1569        stream_id: SuiAddress,
1570        start_checkpoint: u64,
1571        start_accumulator_version: u64,
1572        start_transaction_idx: u32,
1573        start_event_idx: u32,
1574        end_checkpoint: u64,
1575        limit: u32,
1576    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1577    {
1578        self.tables.event_iter(
1579            stream_id,
1580            start_checkpoint,
1581            start_accumulator_version,
1582            start_transaction_idx,
1583            start_event_idx,
1584            end_checkpoint,
1585            limit,
1586        )
1587    }
1588
1589    pub fn get_highest_indexed_checkpoint_seq_number(
1590        &self,
1591    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1592        self.tables.watermark.get(&Watermark::Indexed)
1593    }
1594}
1595
1596fn should_index_dynamic_field(object: &Object) -> bool {
1597    // Skip any objects that aren't of type `Field<Name, Value>`
1598    //
1599    // All dynamic fields are of type:
1600    //   - Field<Name, Value> for dynamic fields
1601    //   - Field<Wrapper<Name>, ID>> for dynamic field objects where the ID is the id of the pointed
1602    //   to object
1603    //
1604    object
1605        .data
1606        .try_as_move()
1607        .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1608}
1609
1610fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1611    use sui_types::coin::CoinMetadata;
1612    use sui_types::coin::RegulatedCoinMetadata;
1613    use sui_types::coin::TreasuryCap;
1614
1615    let object_type = object.type_().and_then(MoveObjectType::other)?;
1616
1617    if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1618        return Some((
1619            CoinIndexKey { coin_type },
1620            CoinIndexInfo {
1621                coin_metadata_object_id: Some(object.id()),
1622                treasury_object_id: None,
1623                regulated_coin_metadata_object_id: None,
1624            },
1625        ));
1626    }
1627
1628    if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1629        return Some((
1630            CoinIndexKey { coin_type },
1631            CoinIndexInfo {
1632                coin_metadata_object_id: None,
1633                treasury_object_id: Some(object.id()),
1634                regulated_coin_metadata_object_id: None,
1635            },
1636        ));
1637    }
1638
1639    if let Some(coin_type) =
1640        RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1641    {
1642        return Some((
1643            CoinIndexKey { coin_type },
1644            CoinIndexInfo {
1645                coin_metadata_object_id: None,
1646                treasury_object_id: None,
1647                regulated_coin_metadata_object_id: Some(object.id()),
1648            },
1649        ));
1650    }
1651
1652    None
1653}
1654
1655struct RpcParLiveObjectSetIndexer<'a> {
1656    tables: &'a IndexStoreTables,
1657    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1658    batch_size_limit: usize,
1659}
1660
1661struct RpcLiveObjectIndexer<'a> {
1662    tables: &'a IndexStoreTables,
1663    batch: typed_store::rocks::DBBatch,
1664    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1665    balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1666    batch_size_limit: usize,
1667}
1668
1669impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1670    type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1671
1672    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1673        RpcLiveObjectIndexer {
1674            tables: self.tables,
1675            batch: self.tables.owner.batch(),
1676            coin_index: self.coin_index,
1677            balance_changes: HashMap::new(),
1678            batch_size_limit: self.batch_size_limit,
1679        }
1680    }
1681}
1682
1683impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1684    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1685        match object.owner {
1686            // Owner Index
1687            Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1688                let owner_key = OwnerIndexKey::from_object(&object);
1689                let owner_info = OwnerIndexInfo::new(&object);
1690                self.batch
1691                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1692
1693                if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1694                    let balance_key = BalanceKey { owner, coin_type };
1695                    let balance_info = BalanceIndexInfo::from(value);
1696                    self.balance_changes
1697                        .entry(balance_key)
1698                        .or_default()
1699                        .merge_delta(&balance_info);
1700
1701                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1702                        self.batch.partial_merge_batch(
1703                            &self.tables.balance,
1704                            std::mem::take(&mut self.balance_changes),
1705                        )?;
1706                    }
1707                }
1708            }
1709
1710            // Dynamic Field Index
1711            Owner::ObjectOwner(parent) => {
1712                if should_index_dynamic_field(&object) {
1713                    let field_key = DynamicFieldKey::new(parent, object.id());
1714                    self.batch
1715                        .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1716                }
1717            }
1718
1719            Owner::Shared { .. } | Owner::Immutable => {}
1720        }
1721
1722        // Look for CoinMetadata<T> and TreasuryCap<T> objects
1723        if let Some((key, value)) = try_create_coin_index_info(&object) {
1724            use std::collections::hash_map::Entry;
1725
1726            match self.coin_index.lock().unwrap().entry(key) {
1727                Entry::Occupied(mut o) => {
1728                    o.get_mut().merge(value);
1729                }
1730                Entry::Vacant(v) => {
1731                    v.insert(value);
1732                }
1733            }
1734        }
1735
1736        if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1737            self.batch
1738                .insert_batch(&self.tables.package_version, [(key, info)])?;
1739        }
1740
1741        // If the batch size grows to greater than the limit then write out to the DB so that the
1742        // data we need to hold in memory doesn't grow unbounded.
1743        if self.batch.size_in_bytes() >= self.batch_size_limit {
1744            std::mem::replace(&mut self.batch, self.tables.owner.batch())
1745                .write_opt(&bulk_ingestion_write_options())?;
1746        }
1747
1748        Ok(())
1749    }
1750
1751    fn finish(mut self) -> Result<(), StorageError> {
1752        self.batch.partial_merge_batch(
1753            &self.tables.balance,
1754            std::mem::take(&mut self.balance_changes),
1755        )?;
1756        self.batch.write_opt(&bulk_ingestion_write_options())?;
1757        Ok(())
1758    }
1759}
1760
1761// TODO figure out a way to dedup this logic. Today we'd need to do quite a bit of refactoring to
1762// make it possible.
1763fn sparse_checkpoint_data_for_backfill(
1764    authority_store: &AuthorityStore,
1765    checkpoint_store: &CheckpointStore,
1766    checkpoint: u64,
1767    load_events: bool,
1768) -> Result<CheckpointData, StorageError> {
1769    use sui_types::full_checkpoint_content::CheckpointTransaction;
1770
1771    let summary = checkpoint_store
1772        .get_checkpoint_by_sequence_number(checkpoint)?
1773        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1774    let contents = checkpoint_store
1775        .get_checkpoint_contents(&summary.content_digest)?
1776        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1777
1778    let transaction_digests = contents
1779        .iter()
1780        .map(|execution_digests| execution_digests.transaction)
1781        .collect::<Vec<_>>();
1782    let transactions = authority_store
1783        .multi_get_transaction_blocks(&transaction_digests)?
1784        .into_iter()
1785        .map(|maybe_transaction| {
1786            maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1787        })
1788        .collect::<Result<Vec<_>, _>>()?;
1789
1790    let effects = authority_store
1791        .multi_get_executed_effects(&transaction_digests)?
1792        .into_iter()
1793        .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1794        .collect::<Result<Vec<_>, _>>()?;
1795
1796    let events = if load_events {
1797        authority_store
1798            .multi_get_events(&transaction_digests)
1799            .map_err(|e| StorageError::custom(e.to_string()))?
1800    } else {
1801        vec![None; transaction_digests.len()]
1802    };
1803
1804    let mut full_transactions = Vec::with_capacity(transactions.len());
1805    for ((tx, fx), ev) in transactions.into_iter().zip(effects).zip(events) {
1806        let input_objects =
1807            sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1808        let output_objects =
1809            sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1810
1811        let full_transaction = CheckpointTransaction {
1812            transaction: tx.into(),
1813            effects: fx,
1814            events: ev,
1815            input_objects,
1816            output_objects,
1817        };
1818
1819        full_transactions.push(full_transaction);
1820    }
1821
1822    let checkpoint_data = CheckpointData {
1823        checkpoint_summary: summary.into(),
1824        checkpoint_contents: contents,
1825        transactions: full_transactions,
1826    };
1827
1828    Ok(checkpoint_data)
1829}
1830
1831fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1832    match Coin::extract_balance_if_coin(object) {
1833        Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1834        Ok(Some(_)) => {
1835            debug!("Coin object {} has non-struct type tag", object.id());
1836            Ok(None)
1837        }
1838        Ok(None) => {
1839            // Not a coin
1840            Ok(None)
1841        }
1842        Err(e) => {
1843            // Corrupted coin data
1844            Err(StorageError::custom(format!(
1845                "Failed to deserialize coin object {}: {}",
1846                object.id(),
1847                e
1848            )))
1849        }
1850    }
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855    use super::*;
1856    use std::sync::atomic::AtomicU64;
1857    use sui_types::base_types::SuiAddress;
1858
1859    #[tokio::test]
1860    async fn test_events_compaction_filter() {
1861        let temp_dir = tempfile::tempdir().unwrap();
1862        let path = temp_dir.path();
1863        let db_path = path.join("rpc-index");
1864
1865        let pruning_watermark = Arc::new(AtomicU64::new(5));
1866        let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1867
1868        let index_options = IndexStoreOptions {
1869            events_compaction_filter: Some(compaction_filter),
1870        };
1871
1872        let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1873        let stream_id = SuiAddress::random_for_testing_only();
1874        let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1875            .iter()
1876            .map(|&checkpoint_seq| EventIndexKey {
1877                stream_id,
1878                checkpoint_seq,
1879                accumulator_version: 0,
1880                transaction_idx: 0,
1881                event_index: 0,
1882            })
1883            .collect();
1884
1885        let mut batch = tables.events_by_stream.batch();
1886        for key in &test_events {
1887            batch
1888                .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1889                .unwrap();
1890        }
1891        batch.write().unwrap();
1892
1893        tables.events_by_stream.flush().unwrap();
1894        let mut events_before_compaction = 0;
1895        for result in tables.events_by_stream.safe_iter() {
1896            if result.is_ok() {
1897                events_before_compaction += 1;
1898            }
1899        }
1900        assert_eq!(
1901            events_before_compaction, 5,
1902            "Should have 5 events before compaction"
1903        );
1904        let start_key = EventIndexKey {
1905            stream_id: SuiAddress::ZERO,
1906            checkpoint_seq: 0,
1907            accumulator_version: 0,
1908            transaction_idx: 0,
1909            event_index: 0,
1910        };
1911        let end_key = EventIndexKey {
1912            stream_id: SuiAddress::random_for_testing_only(),
1913            checkpoint_seq: u64::MAX,
1914            accumulator_version: u64::MAX,
1915            transaction_idx: u32::MAX,
1916            event_index: u32::MAX,
1917        };
1918
1919        tables
1920            .events_by_stream
1921            .compact_range(&start_key, &end_key)
1922            .unwrap();
1923        let mut events_after_compaction = Vec::new();
1924        for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1925            events_after_compaction.push(key);
1926        }
1927
1928        println!("Events after compaction: {}", events_after_compaction.len());
1929        assert!(
1930            events_after_compaction.len() >= 2,
1931            "Should have at least the events that shouldn't be pruned"
1932        );
1933        pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1934        let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1935        assert_eq!(watermark_after, 20, "Watermark should be updated");
1936    }
1937
1938    #[test]
1939    fn test_events_compaction_filter_logic() {
1940        let watermark = Arc::new(AtomicU64::new(100));
1941        let filter = EventsCompactionFilter::new(watermark.clone());
1942
1943        let old_key = EventIndexKey {
1944            stream_id: SuiAddress::random_for_testing_only(),
1945            checkpoint_seq: 50,
1946            accumulator_version: 0,
1947            transaction_idx: 0,
1948            event_index: 0,
1949        };
1950        let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1951        let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1952        assert!(
1953            matches!(decision, Decision::Remove),
1954            "Event with checkpoint 50 should be removed when watermark is 100"
1955        );
1956        let new_key = EventIndexKey {
1957            stream_id: SuiAddress::random_for_testing_only(),
1958            checkpoint_seq: 150,
1959            accumulator_version: 0,
1960            transaction_idx: 0,
1961            event_index: 0,
1962        };
1963        let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1964        let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1965        assert!(
1966            matches!(decision, Decision::Keep),
1967            "Event with checkpoint 150 should be kept when watermark is 100"
1968        );
1969        let boundary_key = EventIndexKey {
1970            stream_id: SuiAddress::random_for_testing_only(),
1971            checkpoint_seq: 100,
1972            accumulator_version: 0,
1973            transaction_idx: 0,
1974            event_index: 0,
1975        };
1976        let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
1977        let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
1978        assert!(
1979            matches!(decision, Decision::Remove),
1980            "Event with checkpoint equal to watermark should be removed"
1981        );
1982    }
1983}