sui_core/
rpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use serde::Deserialize;
15use serde::Serialize;
16use std::collections::{BTreeMap, HashMap};
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::time::Duration;
22use std::time::Instant;
23use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
24use sui_types::base_types::MoveObjectType;
25use sui_types::base_types::ObjectID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::base_types::SuiAddress;
28use sui_types::coin::Coin;
29use sui_types::committee::EpochId;
30use sui_types::digests::TransactionDigest;
31use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
32use sui_types::full_checkpoint_content::CheckpointData;
33use sui_types::layout_resolver::LayoutResolver;
34use sui_types::messages_checkpoint::CheckpointContents;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36use sui_types::object::Data;
37use sui_types::object::Object;
38use sui_types::object::Owner;
39use sui_types::storage::BackingPackageStore;
40use sui_types::storage::DynamicFieldKey;
41use sui_types::storage::EpochInfo;
42use sui_types::storage::TransactionInfo;
43use sui_types::storage::error::Error as StorageError;
44use sui_types::sui_system_state::SuiSystemStateTrait;
45use sui_types::transaction::{TransactionDataAPI, TransactionKind};
46use sysinfo::{MemoryRefreshKind, RefreshKind, System};
47use tracing::{debug, info, warn};
48use typed_store::DBMapUtils;
49use typed_store::TypedStoreError;
50use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
51use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
52use typed_store::traits::Map;
53
54const CURRENT_DB_VERSION: u64 = 3;
55// I tried increasing this to 100k and 1M and it didn't speed up indexing at all.
56const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
57
58fn bulk_ingestion_write_options() -> WriteOptions {
59    let mut opts = WriteOptions::default();
60    opts.disable_wal(true);
61    opts
62}
63
64/// Get available memory, respecting cgroup limits in containerized environments
65fn get_available_memory() -> u64 {
66    // RefreshKind::nothing().with_memory() avoids collecting other, slower stats
67    let mut sys = System::new_with_specifics(
68        RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
69    );
70    sys.refresh_memory();
71
72    // Check if we have cgroup limits
73    if let Some(cgroup_limits) = sys.cgroup_limits() {
74        let memory_limit = cgroup_limits.total_memory;
75        // cgroup_limits.total_memory is 0 when there's no limit
76        if memory_limit > 0 {
77            debug!("Using cgroup memory limit: {} bytes", memory_limit);
78            return memory_limit;
79        }
80    }
81
82    // Fall back to system memory if no cgroup limits found
83    // sysinfo 0.35 already reports bytes (not KiB like older versions)
84    let total_memory_bytes = sys.total_memory();
85    debug!("Using system memory: {} bytes", total_memory_bytes);
86    total_memory_bytes
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
90struct MetadataInfo {
91    /// Version of the Database
92    version: u64,
93}
94
95/// Checkpoint watermark type
96#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
97pub enum Watermark {
98    Indexed,
99    Pruned,
100}
101
102#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
103pub struct OwnerIndexKey {
104    pub owner: SuiAddress,
105
106    pub object_type: StructTag,
107
108    // If this object is coin-like (eg 0x2::coin::Coin) then this will be the balance of the coin
109    // inverted `!coin.balance` in order to force sorting of coins to be from greatest to least
110    pub inverted_balance: Option<u64>,
111
112    pub object_id: ObjectID,
113}
114
115impl OwnerIndexKey {
116    // Creates a key from the provided object.
117    // Panics if the provided object is not an Address owned object
118    fn from_object(object: &Object) -> Self {
119        let owner = match object.owner() {
120            Owner::AddressOwner(owner) => owner,
121            Owner::ConsensusAddressOwner { owner, .. } => owner,
122            _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
123        };
124        let object_type = object.struct_tag().expect("packages cannot be owned");
125
126        let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
127
128        Self {
129            owner: *owner,
130            object_type,
131            inverted_balance,
132            object_id: object.id(),
133        }
134    }
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
138pub struct OwnerIndexInfo {
139    // object_id and type of this object are a part of the key
140    pub version: SequenceNumber,
141}
142
143impl OwnerIndexInfo {
144    pub fn new(object: &Object) -> Self {
145        Self {
146            version: object.version(),
147        }
148    }
149}
150
151#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
152pub struct CoinIndexKey {
153    coin_type: StructTag,
154}
155
156#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
157pub struct BalanceKey {
158    pub owner: SuiAddress,
159    pub coin_type: StructTag,
160}
161
162#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
163pub struct CoinIndexInfo {
164    pub coin_metadata_object_id: Option<ObjectID>,
165    pub treasury_object_id: Option<ObjectID>,
166    pub regulated_coin_metadata_object_id: Option<ObjectID>,
167}
168
169#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
170pub struct BalanceIndexInfo {
171    pub balance_delta: i128,
172}
173
174impl From<u64> for BalanceIndexInfo {
175    fn from(coin_value: u64) -> Self {
176        Self {
177            balance_delta: coin_value as i128,
178        }
179    }
180}
181
182impl BalanceIndexInfo {
183    fn merge_delta(&mut self, other: &Self) {
184        self.balance_delta += other.balance_delta;
185    }
186}
187
188impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
189    fn from(index_info: BalanceIndexInfo) -> Self {
190        // Note: We represent balance deltas as i128 to simplify merging positive and negative updates.
191        // Be aware: Move doesn’t enforce a one-time-witness (OTW) pattern when creating a Supply<T>.
192        // Anyone can call `sui::balance::create_supply` and mint unbounded supply, potentially pushing
193        // total balances over u64::MAX. To avoid crashing the indexer, we clamp the merged value instead
194        // of panicking on overflow. This has the unfortunate consequence of making bugs in the index
195        // harder to detect, but is a necessary trade-off to avoid creating a DOS attack vector.
196        let balance = index_info.balance_delta.clamp(0, u64::MAX as i128) as u64;
197        sui_types::storage::BalanceInfo { balance }
198    }
199}
200
201#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
202pub struct PackageVersionKey {
203    pub original_package_id: ObjectID,
204    pub version: u64,
205}
206
207#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
208pub struct PackageVersionInfo {
209    pub storage_id: ObjectID,
210}
211
212#[derive(Default, Clone)]
213pub struct IndexStoreOptions {
214    pub events_compaction_filter: Option<EventsCompactionFilter>,
215}
216
217fn default_table_options() -> typed_store::rocks::DBOptions {
218    typed_store::rocks::default_db_options().disable_write_throttling()
219}
220
221fn events_table_options(
222    compaction_filter: Option<EventsCompactionFilter>,
223) -> typed_store::rocks::DBOptions {
224    let mut options = default_table_options();
225    if let Some(filter) = compaction_filter {
226        options.options.set_compaction_filter(
227            "events_by_stream",
228            move |_, key, value| match filter.filter(key, value) {
229                Ok(decision) => decision,
230                Err(e) => {
231                    warn!(
232                        "Failed to parse event key during compaction: {}, key: {:?}",
233                        e, key
234                    );
235                    Decision::Remove
236                }
237            },
238        );
239    }
240    options
241}
242
243fn balance_delta_merge_operator(
244    _key: &[u8],
245    existing_val: Option<&[u8]>,
246    operands: &MergeOperands,
247) -> Option<Vec<u8>> {
248    let mut result = existing_val
249        .map(|v| {
250            bcs::from_bytes::<BalanceIndexInfo>(v)
251                .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.")
252        })
253        .unwrap_or_default();
254
255    for operand in operands.iter() {
256        let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
257            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.");
258        result.merge_delta(&delta);
259    }
260    Some(
261        bcs::to_bytes(&result)
262            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
263    )
264}
265
266fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
267    let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
268        Ok(info) => info,
269        Err(_) => return Decision::Keep,
270    };
271
272    if balance_info.balance_delta == 0 {
273        Decision::Remove
274    } else {
275        Decision::Keep
276    }
277}
278
279fn balance_table_options() -> typed_store::rocks::DBOptions {
280    default_table_options()
281        .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
282        .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
283}
284
285impl CoinIndexInfo {
286    fn merge(&mut self, other: Self) {
287        self.coin_metadata_object_id = self
288            .coin_metadata_object_id
289            .or(other.coin_metadata_object_id);
290        self.regulated_coin_metadata_object_id = self
291            .regulated_coin_metadata_object_id
292            .or(other.regulated_coin_metadata_object_id);
293        self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
294    }
295}
296
297/// RocksDB tables for the RpcIndexStore
298///
299/// Anytime a new table is added, or and existing one has it's schema changed, make sure to also
300/// update the value of `CURRENT_DB_VERSION`.
301///
302/// NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
303/// - bounded in size by the live object set
304/// - are prune-able and have corresponding logic in the `prune` function
305#[derive(DBMapUtils)]
306struct IndexStoreTables {
307    /// A singleton that store metadata information on the DB.
308    ///
309    /// A few uses for this singleton:
310    /// - determining if the DB has been initialized (as some tables will still be empty post
311    ///   initialization)
312    /// - version of the DB. Everytime a new table or schema is changed the version number needs to
313    ///   be incremented.
314    meta: DBMap<(), MetadataInfo>,
315
316    /// Table used to track watermark for the highest indexed checkpoint
317    ///
318    /// This is useful to help know the highest checkpoint that was indexed in the event that the
319    /// node was running with indexes enabled, then run for a period of time with indexes disabled,
320    /// and then run with them enabled again so that the tables can be reinitialized.
321    #[default_options_override_fn = "default_table_options"]
322    watermark: DBMap<Watermark, CheckpointSequenceNumber>,
323
324    /// An index of extra metadata for Epochs.
325    ///
326    /// Only contains entries for transactions which have yet to be pruned from the main database.
327    #[default_options_override_fn = "default_table_options"]
328    epochs: DBMap<EpochId, EpochInfo>,
329
330    /// An index of extra metadata for Transactions.
331    ///
332    /// Only contains entries for transactions which have yet to be pruned from the main database.
333    #[default_options_override_fn = "default_table_options"]
334    #[allow(unused)]
335    #[deprecated]
336    transactions: DBMap<TransactionDigest, TransactionInfo>,
337
338    /// An index of object ownership.
339    ///
340    /// Allows an efficient iterator to list all objects currently owned by a specific user
341    /// account.
342    #[default_options_override_fn = "default_table_options"]
343    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
344
345    /// An index of dynamic fields (children objects).
346    ///
347    /// Allows an efficient iterator to list all of the dynamic fields owned by a particular
348    /// ObjectID.
349    #[default_options_override_fn = "default_table_options"]
350    dynamic_field: DBMap<DynamicFieldKey, ()>,
351
352    /// An index of Coin Types
353    ///
354    /// Allows looking up information related to published Coins, like the ObjectID of its
355    /// coorisponding CoinMetadata.
356    #[default_options_override_fn = "default_table_options"]
357    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
358
359    /// An index of Balances.
360    ///
361    /// Allows looking up balances by owner address and coin type.
362    #[default_options_override_fn = "balance_table_options"]
363    balance: DBMap<BalanceKey, BalanceIndexInfo>,
364
365    /// An index of Package versions.
366    ///
367    /// Maps original package ID and version to the storage ID of that version.
368    /// Allows efficient listing of all versions of a package.
369    #[default_options_override_fn = "default_table_options"]
370    package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
371
372    /// Authenticated events index by (stream_id, checkpoint_seq, transaction_idx, event_index)
373    events_by_stream: DBMap<EventIndexKey, ()>,
374    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
375    // - bounded in size by the live object set
376    // - are prune-able and have corresponding logic in the `prune` function
377}
378
379#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
380pub struct EventIndexKey {
381    pub stream_id: SuiAddress,
382    pub checkpoint_seq: u64,
383    /// The accumulator version that this event is settled into
384    pub accumulator_version: u64,
385    pub transaction_idx: u32,
386    pub event_index: u32,
387}
388
389/// Compaction filter for automatic pruning of old authenticated events during RocksDB compaction.
390#[derive(Clone)]
391pub struct EventsCompactionFilter {
392    pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
393}
394
395impl EventsCompactionFilter {
396    pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
397        Self { pruning_watermark }
398    }
399
400    pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
401        let event_key: EventIndexKey = bcs::from_bytes(key)?;
402        let watermark = self
403            .pruning_watermark
404            .load(std::sync::atomic::Ordering::Relaxed);
405
406        if event_key.checkpoint_seq <= watermark {
407            Ok(Decision::Remove)
408        } else {
409            Ok(Decision::Keep)
410        }
411    }
412}
413
414impl IndexStoreTables {
415    fn extract_version_if_package(
416        object: &Object,
417    ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
418        if let Data::Package(package) = &object.data {
419            let original_id = package.original_package_id();
420            let version = package.version().value();
421            let storage_id = object.id();
422
423            let key = PackageVersionKey {
424                original_package_id: original_id,
425                version,
426            };
427            let info = PackageVersionInfo { storage_id };
428            return Some((key, info));
429        }
430        None
431    }
432
433    fn open_with_index_options<P: Into<PathBuf>>(
434        path: P,
435        index_options: IndexStoreOptions,
436    ) -> Self {
437        let mut table_options = std::collections::BTreeMap::new();
438        table_options.insert("balance".to_string(), balance_table_options());
439        table_options.insert(
440            "events_by_stream".to_string(),
441            events_table_options(index_options.events_compaction_filter),
442        );
443
444        IndexStoreTables::open_tables_read_write_with_deprecation_option(
445            path.into(),
446            MetricConf::new("rpc-index"),
447            None,
448            Some(DBMapTableConfigMap::new(table_options)),
449            true, // remove deprecated tables
450        )
451    }
452
453    fn open_with_options<P: Into<PathBuf>>(
454        path: P,
455        options: typed_store::rocksdb::Options,
456        table_options: Option<DBMapTableConfigMap>,
457    ) -> Self {
458        IndexStoreTables::open_tables_read_write_with_deprecation_option(
459            path.into(),
460            MetricConf::new("rpc-index"),
461            Some(options),
462            table_options,
463            true, // remove deprecated tables
464        )
465    }
466
467    fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
468        (match self.meta.get(&()) {
469            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
470            Ok(None) => true,
471            Err(_) => true,
472        }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
473    }
474
475    // Check if the index watermark is behind the highets_executed watermark.
476    fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
477        let highest_executed_checkpint = checkpoint_store
478            .get_highest_executed_checkpoint_seq_number()
479            .ok()
480            .flatten();
481        let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
482        watermark < highest_executed_checkpint
483    }
484
485    #[tracing::instrument(skip_all)]
486    fn init(
487        &mut self,
488        authority_store: &AuthorityStore,
489        checkpoint_store: &CheckpointStore,
490        _epoch_store: &AuthorityPerEpochStore,
491        _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
492        batch_size_limit: usize,
493        rpc_config: &sui_config::RpcConfig,
494    ) -> Result<(), StorageError> {
495        info!("Initializing RPC indexes");
496
497        let highest_executed_checkpint =
498            checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
499        let lowest_available_checkpoint = checkpoint_store
500            .get_highest_pruned_checkpoint_seq_number()?
501            .map(|c| c.saturating_add(1))
502            .unwrap_or(0);
503        let lowest_available_checkpoint_objects = authority_store
504            .perpetual_tables
505            .get_highest_pruned_checkpoint()?
506            .map(|c| c.saturating_add(1))
507            .unwrap_or(0);
508        // Doing backfill requires processing objects so we have to restrict our backfill range
509        // to the range of checkpoints that we have objects for.
510        let lowest_available_checkpoint =
511            lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
512
513        let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
514            lowest_available_checkpoint..=highest_executed_checkpint
515        });
516
517        if let Some(checkpoint_range) = checkpoint_range {
518            self.index_existing_checkpoints(
519                authority_store,
520                checkpoint_store,
521                checkpoint_range,
522                rpc_config,
523            )?;
524        }
525
526        self.initialize_current_epoch(authority_store, checkpoint_store)?;
527
528        // Only index live objects if genesis checkpoint has been executed.
529        // If genesis hasn't been executed yet, the objects will be properly indexed
530        // as checkpoints are processed through the normal checkpoint execution path.
531        if highest_executed_checkpint.is_some() {
532            let coin_index = Mutex::new(HashMap::new());
533
534            let make_live_object_indexer = RpcParLiveObjectSetIndexer {
535                tables: self,
536                coin_index: &coin_index,
537                batch_size_limit,
538            };
539
540            crate::par_index_live_object_set::par_index_live_object_set(
541                authority_store,
542                &make_live_object_indexer,
543            )?;
544
545            self.coin.multi_insert(coin_index.into_inner().unwrap())?;
546        }
547
548        self.watermark.insert(
549            &Watermark::Indexed,
550            &highest_executed_checkpint.unwrap_or(0),
551        )?;
552
553        self.meta.insert(
554            &(),
555            &MetadataInfo {
556                version: CURRENT_DB_VERSION,
557            },
558        )?;
559
560        info!("Finished initializing RPC indexes");
561
562        Ok(())
563    }
564
565    #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
566    fn index_existing_checkpoints(
567        &mut self,
568        authority_store: &AuthorityStore,
569        checkpoint_store: &CheckpointStore,
570        checkpoint_range: std::ops::RangeInclusive<u64>,
571        rpc_config: &sui_config::RpcConfig,
572    ) -> Result<(), StorageError> {
573        info!(
574            "Indexing {} checkpoints in range {checkpoint_range:?}",
575            checkpoint_range.size_hint().0
576        );
577        let start_time = Instant::now();
578
579        checkpoint_range.into_par_iter().try_for_each(|seq| {
580            let load_events = rpc_config.authenticated_events_indexing();
581            let Some(checkpoint_data) = sparse_checkpoint_data_for_epoch_backfill(
582                authority_store,
583                checkpoint_store,
584                seq,
585                load_events,
586            )?
587            else {
588                return Ok(());
589            };
590
591            let mut batch = self.epochs.batch();
592
593            self.index_epoch(&checkpoint_data, &mut batch)?;
594
595            batch
596                .write_opt(bulk_ingestion_write_options())
597                .map_err(StorageError::from)
598        })?;
599
600        info!(
601            "Indexing checkpoints took {} seconds",
602            start_time.elapsed().as_secs()
603        );
604        Ok(())
605    }
606
607    /// Prune data from this Index
608    fn prune(
609        &self,
610        pruned_checkpoint_watermark: u64,
611        _checkpoint_contents_to_prune: &[CheckpointContents],
612    ) -> Result<(), TypedStoreError> {
613        let mut batch = self.watermark.batch();
614
615        batch.insert_batch(
616            &self.watermark,
617            [(Watermark::Pruned, pruned_checkpoint_watermark)],
618        )?;
619
620        batch.write()
621    }
622
623    /// Index a Checkpoint
624    fn index_checkpoint(
625        &self,
626        checkpoint: &CheckpointData,
627        _resolver: &mut dyn LayoutResolver,
628        rpc_config: &sui_config::RpcConfig,
629    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
630        debug!(
631            checkpoint = checkpoint.checkpoint_summary.sequence_number,
632            "indexing checkpoint"
633        );
634
635        let mut batch = self.owner.batch();
636
637        self.index_epoch(checkpoint, &mut batch)?;
638        self.index_transactions(
639            checkpoint,
640            &mut batch,
641            rpc_config.authenticated_events_indexing(),
642        )?;
643        self.index_objects(checkpoint, &mut batch)?;
644
645        batch.insert_batch(
646            &self.watermark,
647            [(
648                Watermark::Indexed,
649                checkpoint.checkpoint_summary.sequence_number,
650            )],
651        )?;
652
653        debug!(
654            checkpoint = checkpoint.checkpoint_summary.sequence_number,
655            "finished indexing checkpoint"
656        );
657
658        Ok(batch)
659    }
660
661    fn extract_accumulator_version(
662        &self,
663        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
664    ) -> Option<u64> {
665        let TransactionKind::ProgrammableSystemTransaction(pt) =
666            tx.transaction.transaction_data().kind()
667        else {
668            return None;
669        };
670
671        if pt.shared_input_objects().any(|obj| {
672            obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
673                && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
674        }) {
675            return tx.output_objects.iter().find_map(|obj| {
676                if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
677                    Some(obj.version().value())
678                } else {
679                    None
680                }
681            });
682        }
683
684        None
685    }
686
687    fn index_transaction_events(
688        &self,
689        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
690        checkpoint_seq: u64,
691        tx_idx: u32,
692        accumulator_version: Option<u64>,
693        batch: &mut typed_store::rocks::DBBatch,
694    ) -> Result<(), StorageError> {
695        let acc_events = tx.effects.accumulator_events();
696        if acc_events.is_empty() {
697            return Ok(());
698        }
699
700        let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
701        for acc in acc_events {
702            if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
703                let Some(accumulator_version) = accumulator_version else {
704                    mysten_common::debug_fatal!(
705                        "Found events at checkpoint {} tx {} before any accumulator settlement",
706                        checkpoint_seq,
707                        tx_idx
708                    );
709                    continue;
710                };
711
712                if let Some(stream_id) =
713                    sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
714                {
715                    for (idx, _d) in event_digests {
716                        let key = EventIndexKey {
717                            stream_id,
718                            checkpoint_seq,
719                            accumulator_version,
720                            transaction_idx: tx_idx,
721                            event_index: *idx as u32,
722                        };
723                        entries.push((key, ()));
724                    }
725                }
726            }
727        }
728
729        if !entries.is_empty() {
730            batch.insert_batch(&self.events_by_stream, entries)?;
731        }
732        Ok(())
733    }
734
735    fn index_epoch(
736        &self,
737        checkpoint: &CheckpointData,
738        batch: &mut typed_store::rocks::DBBatch,
739    ) -> Result<(), StorageError> {
740        let Some(epoch_info) = checkpoint.epoch_info()? else {
741            return Ok(());
742        };
743        if epoch_info.epoch > 0 {
744            let prev_epoch = epoch_info.epoch - 1;
745            let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
746            current_epoch.epoch = prev_epoch; // set this incase there wasn't an entry
747            current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
748            current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
749            batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
750        }
751        batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
752        Ok(())
753    }
754
755    // After attempting to reindex past epochs, ensure that the current epoch is at least partially
756    // initalized
757    fn initialize_current_epoch(
758        &mut self,
759        authority_store: &AuthorityStore,
760        checkpoint_store: &CheckpointStore,
761    ) -> Result<(), StorageError> {
762        let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
763            return Ok(());
764        };
765
766        let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
767            .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
768
769        let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
770        epoch.epoch = checkpoint.epoch;
771
772        if epoch.protocol_version.is_none() {
773            epoch.protocol_version = Some(system_state.protocol_version());
774        }
775
776        if epoch.start_timestamp_ms.is_none() {
777            epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
778        }
779
780        if epoch.reference_gas_price.is_none() {
781            epoch.reference_gas_price = Some(system_state.reference_gas_price());
782        }
783
784        if epoch.system_state.is_none() {
785            epoch.system_state = Some(system_state);
786        }
787
788        self.epochs.insert(&epoch.epoch, &epoch)?;
789
790        Ok(())
791    }
792
793    fn index_transactions(
794        &self,
795        checkpoint: &CheckpointData,
796        batch: &mut typed_store::rocks::DBBatch,
797        index_events: bool,
798    ) -> Result<(), StorageError> {
799        let cp = checkpoint.checkpoint_summary.sequence_number;
800        let mut current_accumulator_version: Option<u64> = None;
801
802        // iterate in reverse order, process accumulator settlements first
803        for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
804            let balance_changes = sui_types::balance_change::derive_balance_changes(
805                &tx.effects,
806                &tx.input_objects,
807                &tx.output_objects,
808            )
809            .into_iter()
810            .filter_map(|change| {
811                if let TypeTag::Struct(coin_type) = change.coin_type {
812                    Some((
813                        BalanceKey {
814                            owner: change.address,
815                            coin_type: *coin_type,
816                        },
817                        BalanceIndexInfo {
818                            balance_delta: change.amount,
819                        },
820                    ))
821                } else {
822                    None
823                }
824            });
825            batch.partial_merge_batch(&self.balance, balance_changes)?;
826
827            if index_events {
828                if let Some(version) = self.extract_accumulator_version(tx) {
829                    current_accumulator_version = Some(version);
830                }
831
832                self.index_transaction_events(
833                    tx,
834                    cp,
835                    tx_idx as u32,
836                    current_accumulator_version,
837                    batch,
838                )?;
839            }
840        }
841
842        Ok(())
843    }
844
845    fn index_objects(
846        &self,
847        checkpoint: &CheckpointData,
848        batch: &mut typed_store::rocks::DBBatch,
849    ) -> Result<(), StorageError> {
850        let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
851        let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
852
853        for tx in &checkpoint.transactions {
854            // determine changes from removed objects
855            for removed_object in tx.removed_objects_pre_version() {
856                match removed_object.owner() {
857                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
858                        let owner_key = OwnerIndexKey::from_object(removed_object);
859                        batch.delete_batch(&self.owner, [owner_key])?;
860                    }
861                    Owner::ObjectOwner(object_id) => {
862                        batch.delete_batch(
863                            &self.dynamic_field,
864                            [DynamicFieldKey::new(*object_id, removed_object.id())],
865                        )?;
866                    }
867                    Owner::Shared { .. } | Owner::Immutable => {}
868                }
869            }
870
871            // determine changes from changed objects
872            for (object, old_object) in tx.changed_objects() {
873                if let Some(old_object) = old_object {
874                    match old_object.owner() {
875                        Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
876                            let owner_key = OwnerIndexKey::from_object(old_object);
877                            batch.delete_batch(&self.owner, [owner_key])?;
878                        }
879
880                        Owner::ObjectOwner(object_id) => {
881                            if old_object.owner() != object.owner() {
882                                batch.delete_batch(
883                                    &self.dynamic_field,
884                                    [DynamicFieldKey::new(*object_id, old_object.id())],
885                                )?;
886                            }
887                        }
888
889                        Owner::Shared { .. } | Owner::Immutable => {}
890                    }
891                }
892
893                match object.owner() {
894                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
895                        let owner_key = OwnerIndexKey::from_object(object);
896                        let owner_info = OwnerIndexInfo::new(object);
897                        batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
898                    }
899                    Owner::ObjectOwner(parent) => {
900                        if should_index_dynamic_field(object) {
901                            let field_key = DynamicFieldKey::new(*parent, object.id());
902                            batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
903                        }
904                    }
905                    Owner::Shared { .. } | Owner::Immutable => {}
906                }
907                if let Some((key, info)) = Self::extract_version_if_package(object) {
908                    package_version_index.push((key, info));
909                }
910            }
911
912            // coin indexing
913            //
914            // coin indexing relies on the fact that CoinMetadata and TreasuryCap are created in
915            // the same transaction so we don't need to worry about overriding any older value
916            // that may exist in the database (because there necessarily cannot be).
917            for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
918                use std::collections::hash_map::Entry;
919
920                match coin_index.entry(key) {
921                    Entry::Occupied(mut o) => {
922                        o.get_mut().merge(value);
923                    }
924                    Entry::Vacant(v) => {
925                        v.insert(value);
926                    }
927                }
928            }
929        }
930
931        batch.insert_batch(&self.coin, coin_index)?;
932        batch.insert_batch(&self.package_version, package_version_index)?;
933
934        Ok(())
935    }
936
937    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
938        self.epochs.get(&epoch)
939    }
940
941    fn event_iter(
942        &self,
943        stream_id: SuiAddress,
944        start_checkpoint: u64,
945        start_accumulator_version: u64,
946        start_transaction_idx: u32,
947        start_event_idx: u32,
948        end_checkpoint: u64,
949        limit: u32,
950    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
951    {
952        let lower = EventIndexKey {
953            stream_id,
954            checkpoint_seq: start_checkpoint,
955            accumulator_version: start_accumulator_version,
956            transaction_idx: start_transaction_idx,
957            event_index: start_event_idx,
958        };
959        let upper = EventIndexKey {
960            stream_id,
961            checkpoint_seq: end_checkpoint,
962            accumulator_version: u64::MAX,
963            transaction_idx: u32::MAX,
964            event_index: u32::MAX,
965        };
966
967        Ok(self
968            .events_by_stream
969            .safe_iter_with_bounds(Some(lower), Some(upper))
970            .map(|res| res.map(|(k, _)| k))
971            .take(limit as usize))
972    }
973
974    fn owner_iter(
975        &self,
976        owner: SuiAddress,
977        object_type: Option<StructTag>,
978        cursor: Option<OwnerIndexKey>,
979    ) -> Result<
980        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
981        TypedStoreError,
982    > {
983        // TODO can we figure out how to pass a raw byte array as a cursor?
984        let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
985            owner,
986            object_type: object_type
987                .clone()
988                .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
989            inverted_balance: None,
990            object_id: ObjectID::ZERO,
991        });
992
993        Ok(self
994            .owner
995            .safe_iter_with_bounds(Some(lower_bound), None)
996            .take_while(move |item| {
997                // If there's an error let if flow through
998                let Ok((key, _)) = item else {
999                    return true;
1000                };
1001
1002                // Only take if owner matches
1003                key.owner == owner
1004                    // and if an object type was supplied that the type matches
1005                    && object_type
1006                        .as_ref()
1007                        .map(|ty| {
1008                            ty.address == key.object_type.address
1009                                && ty.module == key.object_type.module
1010                                && ty.name == key.object_type.name
1011                                // If type_params are not provided then we match all params
1012                                && (ty.type_params.is_empty() ||
1013                                    // If they are provided the type params must match
1014                                    ty.type_params == key.object_type.type_params)
1015                        }).unwrap_or(true)
1016            }))
1017    }
1018
1019    fn dynamic_field_iter(
1020        &self,
1021        parent: ObjectID,
1022        cursor: Option<ObjectID>,
1023    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1024    {
1025        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1026        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1027        let iter = self
1028            .dynamic_field
1029            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1030            .map_ok(|(key, ())| key);
1031        Ok(iter)
1032    }
1033
1034    fn get_coin_info(
1035        &self,
1036        coin_type: &StructTag,
1037    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1038        let key = CoinIndexKey {
1039            coin_type: coin_type.to_owned(),
1040        };
1041        self.coin.get(&key)
1042    }
1043
1044    fn get_balance(
1045        &self,
1046        owner: &SuiAddress,
1047        coin_type: &StructTag,
1048    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1049        let key = BalanceKey {
1050            owner: owner.to_owned(),
1051            coin_type: coin_type.to_owned(),
1052        };
1053        self.balance.get(&key)
1054    }
1055
1056    fn balance_iter(
1057        &self,
1058        owner: SuiAddress,
1059        cursor: Option<BalanceKey>,
1060    ) -> Result<
1061        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1062        TypedStoreError,
1063    > {
1064        let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1065            owner,
1066            coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1067        });
1068
1069        Ok(self
1070            .balance
1071            .safe_iter_with_bounds(Some(lower_bound), None)
1072            .scan((), move |_, item| {
1073                match item {
1074                    Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1075                    Ok(_) => None,          // Different owner, stop iteration
1076                    Err(e) => Some(Err(e)), // Propagate error
1077                }
1078            }))
1079    }
1080
1081    fn package_versions_iter(
1082        &self,
1083        original_id: ObjectID,
1084        cursor: Option<u64>,
1085    ) -> Result<
1086        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1087        TypedStoreError,
1088    > {
1089        let lower_bound = PackageVersionKey {
1090            original_package_id: original_id,
1091            version: cursor.unwrap_or(0),
1092        };
1093        let upper_bound = PackageVersionKey {
1094            original_package_id: original_id,
1095            version: u64::MAX,
1096        };
1097
1098        Ok(self
1099            .package_version
1100            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1101    }
1102}
1103
1104pub struct RpcIndexStore {
1105    tables: IndexStoreTables,
1106    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1107    rpc_config: sui_config::RpcConfig,
1108}
1109
1110impl RpcIndexStore {
1111    /// Given the provided directory, construct the path to the db
1112    fn db_path(dir: &Path) -> PathBuf {
1113        dir.join("rpc-index")
1114    }
1115
1116    pub async fn new(
1117        dir: &Path,
1118        authority_store: &AuthorityStore,
1119        checkpoint_store: &CheckpointStore,
1120        epoch_store: &AuthorityPerEpochStore,
1121        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1122        pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1123        rpc_config: sui_config::RpcConfig,
1124    ) -> Self {
1125        let events_filter = EventsCompactionFilter::new(pruning_watermark);
1126        let index_options = IndexStoreOptions {
1127            events_compaction_filter: Some(events_filter),
1128        };
1129
1130        Self::new_with_options(
1131            dir,
1132            authority_store,
1133            checkpoint_store,
1134            epoch_store,
1135            package_store,
1136            index_options,
1137            rpc_config,
1138        )
1139        .await
1140    }
1141
1142    pub async fn new_with_options(
1143        dir: &Path,
1144        authority_store: &AuthorityStore,
1145        checkpoint_store: &CheckpointStore,
1146        epoch_store: &AuthorityPerEpochStore,
1147        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1148        index_options: IndexStoreOptions,
1149        rpc_config: sui_config::RpcConfig,
1150    ) -> Self {
1151        let path = Self::db_path(dir);
1152        let index_config = rpc_config.index_initialization_config();
1153
1154        let tables = {
1155            let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1156
1157            // If the index tables are uninitialized or on an older version then we need to
1158            // populate them
1159            if tables.needs_to_do_initialization(checkpoint_store) {
1160                let batch_size_limit;
1161
1162                let mut tables = {
1163                    drop(tables);
1164                    typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1165                        .await
1166                        .expect("unable to destroy old rpc-index db");
1167
1168                    // Open the empty DB with `unordered_write`s enabled in order to get a ~3x
1169                    // speedup when indexing
1170                    let mut options = typed_store::rocksdb::Options::default();
1171                    options.set_unordered_write(true);
1172
1173                    // Allow CPU-intensive flushing operations to use all CPUs.
1174                    let max_background_jobs = if let Some(jobs) =
1175                        index_config.as_ref().and_then(|c| c.max_background_jobs)
1176                    {
1177                        debug!("Using config override for max_background_jobs: {}", jobs);
1178                        jobs
1179                    } else {
1180                        let jobs = num_cpus::get() as i32;
1181                        debug!(
1182                            "Calculated max_background_jobs: {} (based on CPU count)",
1183                            jobs
1184                        );
1185                        jobs
1186                    };
1187                    options.set_max_background_jobs(max_background_jobs);
1188
1189                    // We are disabling compaction for all column families below. This means we can
1190                    // also disable the backpressure that slows down writes when the number of L0
1191                    // files builds up since we will never compact them anyway.
1192                    options.set_level_zero_file_num_compaction_trigger(0);
1193                    options.set_level_zero_slowdown_writes_trigger(-1);
1194                    options.set_level_zero_stop_writes_trigger(i32::MAX);
1195
1196                    let total_memory_bytes = get_available_memory();
1197                    // This is an upper bound on the amount to of ram the memtables can use across
1198                    // all column families.
1199                    let db_buffer_size = if let Some(size) =
1200                        index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1201                    {
1202                        debug!(
1203                            "Using config override for db_write_buffer_size: {} bytes",
1204                            size
1205                        );
1206                        size
1207                    } else {
1208                        // Default to 80% of system RAM
1209                        let size = (total_memory_bytes as f64 * 0.8) as usize;
1210                        debug!(
1211                            "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1212                            size, total_memory_bytes
1213                        );
1214                        size
1215                    };
1216                    options.set_db_write_buffer_size(db_buffer_size);
1217
1218                    // Create column family specific options.
1219                    let mut table_config_map = BTreeMap::new();
1220
1221                    // Create options with compactions disabled and large write buffers.
1222                    // Each CF can use up to 25% of system RAM, but total is still limited by
1223                    // set_db_write_buffer_size configured above.
1224                    let mut cf_options = typed_store::rocks::default_db_options();
1225                    cf_options.options.set_disable_auto_compactions(true);
1226
1227                    let (buffer_size, buffer_count) = match (
1228                        index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1229                        index_config
1230                            .as_ref()
1231                            .and_then(|c| c.cf_max_write_buffer_number),
1232                    ) {
1233                        (Some(size), Some(count)) => {
1234                            debug!(
1235                                "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1236                                size, count
1237                            );
1238                            (size, count)
1239                        }
1240                        (None, None) => {
1241                            // Calculate buffer configuration: 25% of RAM split across buffers
1242                            let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1243                            debug!(
1244                                "Column family memory budget: {} bytes (25% of {} total bytes)",
1245                                cf_memory_budget, total_memory_bytes
1246                            );
1247                            const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB minimum
1248
1249                            // Target number of buffers based on CPU count
1250                            // More CPUs = more parallel flushing capability
1251                            let target_buffer_count = num_cpus::get().max(2);
1252
1253                            // Aim for CPU-based buffer count, but reduce if it would make buffers too small
1254                            //   For example:
1255                            // - 128GB RAM, 32 CPUs: 32GB per CF / 32 buffers = 1GB each
1256                            // - 16GB RAM, 8 CPUs: 4GB per CF / 8 buffers = 512MB each
1257                            // - 4GB RAM, 8 CPUs: 1GB per CF / 64MB min = ~16 buffers of 64MB each
1258                            let buffer_size =
1259                                (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1260                            let buffer_count = (cf_memory_budget / buffer_size)
1261                                .clamp(2, target_buffer_count)
1262                                as i32;
1263                            debug!(
1264                                "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1265                                buffer_size, buffer_count, target_buffer_count
1266                            );
1267                            (buffer_size, buffer_count)
1268                        }
1269                        _ => {
1270                            panic!(
1271                                "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1272                            );
1273                        }
1274                    };
1275
1276                    cf_options.options.set_write_buffer_size(buffer_size);
1277                    cf_options.options.set_max_write_buffer_number(buffer_count);
1278
1279                    // Calculate batch size limit: default to half the buffer size or 128MB, whichever is smaller
1280                    batch_size_limit = if let Some(limit) =
1281                        index_config.as_ref().and_then(|c| c.batch_size_limit)
1282                    {
1283                        debug!(
1284                            "Using config override for batch_size_limit: {} bytes",
1285                            limit
1286                        );
1287                        limit
1288                    } else {
1289                        let half_buffer = buffer_size / 2;
1290                        let default_limit = 1 << 27; // 128MB
1291                        let limit = half_buffer.min(default_limit);
1292                        debug!(
1293                            "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1294                            limit, half_buffer, default_limit
1295                        );
1296                        limit
1297                    };
1298
1299                    // Apply cf_options to all tables
1300                    for (table_name, _) in IndexStoreTables::describe_tables() {
1301                        table_config_map.insert(table_name, cf_options.clone());
1302                    }
1303
1304                    // Override Balance options with the merge operator
1305                    let mut balance_options = cf_options.clone();
1306                    balance_options = balance_options.set_merge_operator_associative(
1307                        "balance_merge",
1308                        balance_delta_merge_operator,
1309                    );
1310                    table_config_map.insert("balance".to_string(), balance_options);
1311
1312                    table_config_map.insert(
1313                        "events_by_stream".to_string(),
1314                        events_table_options(index_options.events_compaction_filter.clone()),
1315                    );
1316
1317                    IndexStoreTables::open_with_options(
1318                        &path,
1319                        options,
1320                        Some(DBMapTableConfigMap::new(table_config_map)),
1321                    )
1322                };
1323
1324                tables
1325                    .init(
1326                        authority_store,
1327                        checkpoint_store,
1328                        epoch_store,
1329                        package_store,
1330                        batch_size_limit,
1331                        &rpc_config,
1332                    )
1333                    .expect("unable to initialize rpc index from live object set");
1334
1335                // Flush all data to disk before dropping tables.
1336                // This is critical because WAL is disabled during bulk indexing.
1337                // Note we only need to call flush on one table because all tables share the same
1338                // underlying database.
1339                tables
1340                    .meta
1341                    .flush()
1342                    .expect("Failed to flush RPC index tables to disk");
1343
1344                let weak_db = Arc::downgrade(&tables.meta.db);
1345                drop(tables);
1346
1347                let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1348                loop {
1349                    if weak_db.strong_count() == 0 {
1350                        break;
1351                    }
1352                    if std::time::Instant::now() > deadline {
1353                        panic!("unable to reopen DB after indexing");
1354                    }
1355                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1356                }
1357
1358                // Reopen the DB with default options (eg without `unordered_write`s enabled)
1359                let reopened_tables =
1360                    IndexStoreTables::open_with_index_options(&path, index_options);
1361
1362                // Sanity check: verify the database version was persisted correctly
1363                let stored_version = reopened_tables
1364                    .meta
1365                    .get(&())
1366                    .expect("Failed to read metadata from reopened database")
1367                    .expect("Metadata not found in reopened database");
1368                assert_eq!(
1369                    stored_version.version, CURRENT_DB_VERSION,
1370                    "Database version mismatch after flush and reopen: expected {}, found {}",
1371                    CURRENT_DB_VERSION, stored_version.version
1372                );
1373
1374                reopened_tables
1375            } else {
1376                tables
1377            }
1378        };
1379
1380        Self {
1381            tables,
1382            pending_updates: Default::default(),
1383            rpc_config,
1384        }
1385    }
1386
1387    pub fn new_without_init(dir: &Path) -> Self {
1388        let path = Self::db_path(dir);
1389        let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1390
1391        Self {
1392            tables,
1393            pending_updates: Default::default(),
1394            rpc_config: sui_config::RpcConfig::default(),
1395        }
1396    }
1397
1398    pub fn prune(
1399        &self,
1400        pruned_checkpoint_watermark: u64,
1401        checkpoint_contents_to_prune: &[CheckpointContents],
1402    ) -> Result<(), TypedStoreError> {
1403        self.tables
1404            .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1405    }
1406
1407    /// Index a checkpoint and stage the index updated in `pending_updates`.
1408    ///
1409    /// Updates will not be committed to the database until `commit_update_for_checkpoint` is
1410    /// called.
1411    #[tracing::instrument(
1412        skip_all,
1413        fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1414    )]
1415    pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1416        let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1417        let batch = self
1418            .tables
1419            .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1420            .expect("db error");
1421
1422        self.pending_updates
1423            .lock()
1424            .unwrap()
1425            .insert(sequence_number, batch);
1426    }
1427
1428    /// Commits the pending updates for the provided checkpoint number.
1429    ///
1430    /// Invariants:
1431    /// - `index_checkpoint` must have been called for the provided checkpoint
1432    /// - Callers of this function must ensure that it is called for each checkpoint in sequential
1433    ///   order. This will panic if the provided checkpoint does not match the expected next
1434    ///   checkpoint to commit.
1435    #[tracing::instrument(skip(self))]
1436    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1437        let next_batch = self.pending_updates.lock().unwrap().pop_first();
1438
1439        // Its expected that the next batch exists
1440        let (next_sequence_number, batch) = next_batch.unwrap();
1441        assert_eq!(
1442            checkpoint, next_sequence_number,
1443            "commit_update_for_checkpoint must be called in order"
1444        );
1445
1446        Ok(batch.write()?)
1447    }
1448
1449    pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1450        self.tables.get_epoch_info(epoch)
1451    }
1452
1453    pub fn owner_iter(
1454        &self,
1455        owner: SuiAddress,
1456        object_type: Option<StructTag>,
1457        cursor: Option<OwnerIndexKey>,
1458    ) -> Result<
1459        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1460        TypedStoreError,
1461    > {
1462        self.tables.owner_iter(owner, object_type, cursor)
1463    }
1464
1465    pub fn dynamic_field_iter(
1466        &self,
1467        parent: ObjectID,
1468        cursor: Option<ObjectID>,
1469    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1470    {
1471        self.tables.dynamic_field_iter(parent, cursor)
1472    }
1473
1474    pub fn get_coin_info(
1475        &self,
1476        coin_type: &StructTag,
1477    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1478        self.tables.get_coin_info(coin_type)
1479    }
1480
1481    pub fn get_balance(
1482        &self,
1483        owner: &SuiAddress,
1484        coin_type: &StructTag,
1485    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1486        self.tables.get_balance(owner, coin_type)
1487    }
1488
1489    pub fn balance_iter(
1490        &self,
1491        owner: SuiAddress,
1492        cursor: Option<BalanceKey>,
1493    ) -> Result<
1494        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1495        TypedStoreError,
1496    > {
1497        self.tables.balance_iter(owner, cursor)
1498    }
1499
1500    pub fn package_versions_iter(
1501        &self,
1502        original_id: ObjectID,
1503        cursor: Option<u64>,
1504    ) -> Result<
1505        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1506        TypedStoreError,
1507    > {
1508        self.tables.package_versions_iter(original_id, cursor)
1509    }
1510
1511    pub fn event_iter(
1512        &self,
1513        stream_id: SuiAddress,
1514        start_checkpoint: u64,
1515        start_accumulator_version: u64,
1516        start_transaction_idx: u32,
1517        start_event_idx: u32,
1518        end_checkpoint: u64,
1519        limit: u32,
1520    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1521    {
1522        self.tables.event_iter(
1523            stream_id,
1524            start_checkpoint,
1525            start_accumulator_version,
1526            start_transaction_idx,
1527            start_event_idx,
1528            end_checkpoint,
1529            limit,
1530        )
1531    }
1532
1533    pub fn get_highest_indexed_checkpoint_seq_number(
1534        &self,
1535    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1536        self.tables.watermark.get(&Watermark::Indexed)
1537    }
1538}
1539
1540fn should_index_dynamic_field(object: &Object) -> bool {
1541    // Skip any objects that aren't of type `Field<Name, Value>`
1542    //
1543    // All dynamic fields are of type:
1544    //   - Field<Name, Value> for dynamic fields
1545    //   - Field<Wrapper<Name>, ID>> for dynamic field objects where the ID is the id of the pointed
1546    //   to object
1547    //
1548    object
1549        .data
1550        .try_as_move()
1551        .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1552}
1553
1554fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1555    use sui_types::coin::CoinMetadata;
1556    use sui_types::coin::RegulatedCoinMetadata;
1557    use sui_types::coin::TreasuryCap;
1558
1559    let object_type = object.type_().and_then(MoveObjectType::other)?;
1560
1561    if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1562        return Some((
1563            CoinIndexKey { coin_type },
1564            CoinIndexInfo {
1565                coin_metadata_object_id: Some(object.id()),
1566                treasury_object_id: None,
1567                regulated_coin_metadata_object_id: None,
1568            },
1569        ));
1570    }
1571
1572    if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1573        return Some((
1574            CoinIndexKey { coin_type },
1575            CoinIndexInfo {
1576                coin_metadata_object_id: None,
1577                treasury_object_id: Some(object.id()),
1578                regulated_coin_metadata_object_id: None,
1579            },
1580        ));
1581    }
1582
1583    if let Some(coin_type) =
1584        RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1585    {
1586        return Some((
1587            CoinIndexKey { coin_type },
1588            CoinIndexInfo {
1589                coin_metadata_object_id: None,
1590                treasury_object_id: None,
1591                regulated_coin_metadata_object_id: Some(object.id()),
1592            },
1593        ));
1594    }
1595
1596    None
1597}
1598
1599struct RpcParLiveObjectSetIndexer<'a> {
1600    tables: &'a IndexStoreTables,
1601    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1602    batch_size_limit: usize,
1603}
1604
1605struct RpcLiveObjectIndexer<'a> {
1606    tables: &'a IndexStoreTables,
1607    batch: typed_store::rocks::DBBatch,
1608    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1609    balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1610    batch_size_limit: usize,
1611}
1612
1613impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1614    type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1615
1616    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1617        RpcLiveObjectIndexer {
1618            tables: self.tables,
1619            batch: self.tables.owner.batch(),
1620            coin_index: self.coin_index,
1621            balance_changes: HashMap::new(),
1622            batch_size_limit: self.batch_size_limit,
1623        }
1624    }
1625}
1626
1627impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1628    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1629        match object.owner {
1630            // Owner Index
1631            Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1632                let owner_key = OwnerIndexKey::from_object(&object);
1633                let owner_info = OwnerIndexInfo::new(&object);
1634                self.batch
1635                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1636
1637                if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1638                    let balance_key = BalanceKey { owner, coin_type };
1639                    let balance_info = BalanceIndexInfo::from(value);
1640                    self.balance_changes
1641                        .entry(balance_key)
1642                        .or_default()
1643                        .merge_delta(&balance_info);
1644
1645                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1646                        self.batch.partial_merge_batch(
1647                            &self.tables.balance,
1648                            std::mem::take(&mut self.balance_changes),
1649                        )?;
1650                    }
1651                }
1652            }
1653
1654            // Dynamic Field Index
1655            Owner::ObjectOwner(parent) => {
1656                if should_index_dynamic_field(&object) {
1657                    let field_key = DynamicFieldKey::new(parent, object.id());
1658                    self.batch
1659                        .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1660                }
1661
1662                // Index address balances
1663                if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
1664                    && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
1665                {
1666                    let balance_key = BalanceKey { owner, coin_type };
1667                    let balance_info = BalanceIndexInfo {
1668                        balance_delta: balance,
1669                    };
1670                    self.balance_changes
1671                        .entry(balance_key)
1672                        .or_default()
1673                        .merge_delta(&balance_info);
1674
1675                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1676                        self.batch.partial_merge_batch(
1677                            &self.tables.balance,
1678                            std::mem::take(&mut self.balance_changes),
1679                        )?;
1680                    }
1681                }
1682            }
1683
1684            Owner::Shared { .. } | Owner::Immutable => {}
1685        }
1686
1687        // Look for CoinMetadata<T> and TreasuryCap<T> objects
1688        if let Some((key, value)) = try_create_coin_index_info(&object) {
1689            use std::collections::hash_map::Entry;
1690
1691            match self.coin_index.lock().unwrap().entry(key) {
1692                Entry::Occupied(mut o) => {
1693                    o.get_mut().merge(value);
1694                }
1695                Entry::Vacant(v) => {
1696                    v.insert(value);
1697                }
1698            }
1699        }
1700
1701        if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1702            self.batch
1703                .insert_batch(&self.tables.package_version, [(key, info)])?;
1704        }
1705
1706        // If the batch size grows to greater than the limit then write out to the DB so that the
1707        // data we need to hold in memory doesn't grow unbounded.
1708        if self.batch.size_in_bytes() >= self.batch_size_limit {
1709            std::mem::replace(&mut self.batch, self.tables.owner.batch())
1710                .write_opt(bulk_ingestion_write_options())?;
1711        }
1712
1713        Ok(())
1714    }
1715
1716    fn finish(mut self) -> Result<(), StorageError> {
1717        self.batch.partial_merge_batch(
1718            &self.tables.balance,
1719            std::mem::take(&mut self.balance_changes),
1720        )?;
1721        self.batch.write_opt(bulk_ingestion_write_options())?;
1722        Ok(())
1723    }
1724}
1725
1726// TODO figure out a way to dedup this logic. Today we'd need to do quite a bit of refactoring to
1727// make it possible.
1728fn sparse_checkpoint_data_for_epoch_backfill(
1729    authority_store: &AuthorityStore,
1730    checkpoint_store: &CheckpointStore,
1731    checkpoint: u64,
1732    load_events: bool,
1733) -> Result<Option<CheckpointData>, StorageError> {
1734    use sui_types::full_checkpoint_content::CheckpointTransaction;
1735
1736    let summary = checkpoint_store
1737        .get_checkpoint_by_sequence_number(checkpoint)?
1738        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1739
1740    // Only load genesis and end of epoch checkpoints
1741    if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
1742        return Ok(None);
1743    }
1744
1745    let contents = checkpoint_store
1746        .get_checkpoint_contents(&summary.content_digest)?
1747        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1748
1749    let transaction_digests = contents
1750        .iter()
1751        .map(|execution_digests| execution_digests.transaction)
1752        .collect::<Vec<_>>();
1753    let transactions = authority_store
1754        .multi_get_transaction_blocks(&transaction_digests)?
1755        .into_iter()
1756        .map(|maybe_transaction| {
1757            maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1758        })
1759        .collect::<Result<Vec<_>, _>>()?;
1760
1761    let effects = authority_store
1762        .multi_get_executed_effects(&transaction_digests)?
1763        .into_iter()
1764        .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1765        .collect::<Result<Vec<_>, _>>()?;
1766
1767    let events = if load_events {
1768        authority_store
1769            .multi_get_events(&transaction_digests)
1770            .map_err(|e| StorageError::custom(e.to_string()))?
1771    } else {
1772        vec![None; transaction_digests.len()]
1773    };
1774
1775    let mut full_transactions = Vec::with_capacity(transactions.len());
1776    for ((tx, fx), ev) in transactions
1777        .into_iter()
1778        .zip_debug_eq(effects)
1779        .zip_debug_eq(events)
1780    {
1781        let input_objects =
1782            sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1783        let output_objects =
1784            sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1785
1786        let full_transaction = CheckpointTransaction {
1787            transaction: tx.into(),
1788            effects: fx,
1789            events: ev,
1790            input_objects,
1791            output_objects,
1792        };
1793
1794        full_transactions.push(full_transaction);
1795    }
1796
1797    let checkpoint_data = CheckpointData {
1798        checkpoint_summary: summary.into(),
1799        checkpoint_contents: contents,
1800        transactions: full_transactions,
1801    };
1802
1803    Ok(Some(checkpoint_data))
1804}
1805
1806fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1807    match Coin::extract_balance_if_coin(object) {
1808        Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1809        Ok(Some(_)) => {
1810            debug!("Coin object {} has non-struct type tag", object.id());
1811            Ok(None)
1812        }
1813        Ok(None) => {
1814            // Not a coin
1815            Ok(None)
1816        }
1817        Err(e) => {
1818            // Corrupted coin data
1819            Err(StorageError::custom(format!(
1820                "Failed to deserialize coin object {}: {}",
1821                object.id(),
1822                e
1823            )))
1824        }
1825    }
1826}
1827
1828fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
1829    let move_object = object.data.try_as_move()?;
1830
1831    let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
1832    else {
1833        return None;
1834    };
1835
1836    let (key, value): (
1837        sui_types::accumulator_root::AccumulatorKey,
1838        sui_types::accumulator_root::AccumulatorValue,
1839    ) = move_object.try_into().ok()?;
1840
1841    let balance = value.as_u128()? as i128;
1842    if balance <= 0 {
1843        return None;
1844    }
1845
1846    Some((key.owner, *coin_type, balance))
1847}
1848
1849#[cfg(test)]
1850mod tests {
1851    use super::*;
1852    use std::sync::atomic::AtomicU64;
1853    use sui_types::base_types::SuiAddress;
1854
1855    #[tokio::test]
1856    async fn test_events_compaction_filter() {
1857        let temp_dir = tempfile::tempdir().unwrap();
1858        let path = temp_dir.path();
1859        let db_path = path.join("rpc-index");
1860
1861        let pruning_watermark = Arc::new(AtomicU64::new(5));
1862        let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1863
1864        let index_options = IndexStoreOptions {
1865            events_compaction_filter: Some(compaction_filter),
1866        };
1867
1868        let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1869        let stream_id = SuiAddress::random_for_testing_only();
1870        let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1871            .iter()
1872            .map(|&checkpoint_seq| EventIndexKey {
1873                stream_id,
1874                checkpoint_seq,
1875                accumulator_version: 0,
1876                transaction_idx: 0,
1877                event_index: 0,
1878            })
1879            .collect();
1880
1881        let mut batch = tables.events_by_stream.batch();
1882        for key in &test_events {
1883            batch
1884                .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1885                .unwrap();
1886        }
1887        batch.write().unwrap();
1888
1889        tables.events_by_stream.flush().unwrap();
1890        let mut events_before_compaction = 0;
1891        for result in tables.events_by_stream.safe_iter() {
1892            if result.is_ok() {
1893                events_before_compaction += 1;
1894            }
1895        }
1896        assert_eq!(
1897            events_before_compaction, 5,
1898            "Should have 5 events before compaction"
1899        );
1900        let start_key = EventIndexKey {
1901            stream_id: SuiAddress::ZERO,
1902            checkpoint_seq: 0,
1903            accumulator_version: 0,
1904            transaction_idx: 0,
1905            event_index: 0,
1906        };
1907        let end_key = EventIndexKey {
1908            stream_id: SuiAddress::random_for_testing_only(),
1909            checkpoint_seq: u64::MAX,
1910            accumulator_version: u64::MAX,
1911            transaction_idx: u32::MAX,
1912            event_index: u32::MAX,
1913        };
1914
1915        tables
1916            .events_by_stream
1917            .compact_range(&start_key, &end_key)
1918            .unwrap();
1919        let mut events_after_compaction = Vec::new();
1920        for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1921            events_after_compaction.push(key);
1922        }
1923
1924        println!("Events after compaction: {}", events_after_compaction.len());
1925        assert!(
1926            events_after_compaction.len() >= 2,
1927            "Should have at least the events that shouldn't be pruned"
1928        );
1929        pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1930        let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1931        assert_eq!(watermark_after, 20, "Watermark should be updated");
1932    }
1933
1934    #[test]
1935    fn test_events_compaction_filter_logic() {
1936        let watermark = Arc::new(AtomicU64::new(100));
1937        let filter = EventsCompactionFilter::new(watermark.clone());
1938
1939        let old_key = EventIndexKey {
1940            stream_id: SuiAddress::random_for_testing_only(),
1941            checkpoint_seq: 50,
1942            accumulator_version: 0,
1943            transaction_idx: 0,
1944            event_index: 0,
1945        };
1946        let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1947        let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1948        assert!(
1949            matches!(decision, Decision::Remove),
1950            "Event with checkpoint 50 should be removed when watermark is 100"
1951        );
1952        let new_key = EventIndexKey {
1953            stream_id: SuiAddress::random_for_testing_only(),
1954            checkpoint_seq: 150,
1955            accumulator_version: 0,
1956            transaction_idx: 0,
1957            event_index: 0,
1958        };
1959        let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1960        let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1961        assert!(
1962            matches!(decision, Decision::Keep),
1963            "Event with checkpoint 150 should be kept when watermark is 100"
1964        );
1965        let boundary_key = EventIndexKey {
1966            stream_id: SuiAddress::random_for_testing_only(),
1967            checkpoint_seq: 100,
1968            accumulator_version: 0,
1969            transaction_idx: 0,
1970            event_index: 0,
1971        };
1972        let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
1973        let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
1974        assert!(
1975            matches!(decision, Decision::Remove),
1976            "Event with checkpoint equal to watermark should be removed"
1977        );
1978    }
1979}