sui_core/
rpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use serde::Deserialize;
15use serde::Serialize;
16use std::collections::{BTreeMap, HashMap};
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::time::Duration;
22use std::time::Instant;
23use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
24use sui_types::base_types::MoveObjectType;
25use sui_types::base_types::ObjectID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::base_types::SuiAddress;
28use sui_types::coin::Coin;
29use sui_types::committee::EpochId;
30use sui_types::digests::TransactionDigest;
31use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
32use sui_types::full_checkpoint_content::CheckpointData;
33use sui_types::layout_resolver::LayoutResolver;
34use sui_types::messages_checkpoint::CheckpointContents;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36use sui_types::object::Data;
37use sui_types::object::Object;
38use sui_types::object::Owner;
39use sui_types::storage::BackingPackageStore;
40use sui_types::storage::DynamicFieldKey;
41use sui_types::storage::EpochInfo;
42use sui_types::storage::TransactionInfo;
43use sui_types::storage::error::Error as StorageError;
44use sui_types::sui_system_state::SuiSystemStateTrait;
45use sui_types::transaction::{TransactionDataAPI, TransactionKind};
46use sysinfo::{MemoryRefreshKind, RefreshKind, System};
47use tracing::{debug, info, warn};
48use typed_store::DBMapUtils;
49use typed_store::TypedStoreError;
50use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
51use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
52use typed_store::traits::Map;
53
54const CURRENT_DB_VERSION: u64 = 4;
55// I tried increasing this to 100k and 1M and it didn't speed up indexing at all.
56const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
57
58fn bulk_ingestion_write_options() -> WriteOptions {
59    let mut opts = WriteOptions::default();
60    opts.disable_wal(true);
61    opts
62}
63
64/// Get available memory, respecting cgroup limits in containerized environments
65fn get_available_memory() -> u64 {
66    // RefreshKind::nothing().with_memory() avoids collecting other, slower stats
67    let mut sys = System::new_with_specifics(
68        RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
69    );
70    sys.refresh_memory();
71
72    // Check if we have cgroup limits
73    if let Some(cgroup_limits) = sys.cgroup_limits() {
74        let memory_limit = cgroup_limits.total_memory;
75        // cgroup_limits.total_memory is 0 when there's no limit
76        if memory_limit > 0 {
77            debug!("Using cgroup memory limit: {} bytes", memory_limit);
78            return memory_limit;
79        }
80    }
81
82    // Fall back to system memory if no cgroup limits found
83    // sysinfo 0.35 already reports bytes (not KiB like older versions)
84    let total_memory_bytes = sys.total_memory();
85    debug!("Using system memory: {} bytes", total_memory_bytes);
86    total_memory_bytes
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
90struct MetadataInfo {
91    /// Version of the Database
92    version: u64,
93}
94
95/// Checkpoint watermark type
96#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
97pub enum Watermark {
98    Indexed,
99    Pruned,
100}
101
102#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
103pub struct OwnerIndexKey {
104    pub owner: SuiAddress,
105
106    pub object_type: StructTag,
107
108    // If this object is coin-like (eg 0x2::coin::Coin) then this will be the balance of the coin
109    // inverted `!coin.balance` in order to force sorting of coins to be from greatest to least
110    pub inverted_balance: Option<u64>,
111
112    pub object_id: ObjectID,
113}
114
115impl OwnerIndexKey {
116    // Creates a key from the provided object.
117    // Panics if the provided object is not an Address owned object
118    fn from_object(object: &Object) -> Self {
119        let owner = match object.owner() {
120            Owner::AddressOwner(owner) => owner,
121            Owner::ConsensusAddressOwner { owner, .. } => owner,
122            _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
123        };
124        let object_type = object.struct_tag().expect("packages cannot be owned");
125
126        let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
127
128        Self {
129            owner: *owner,
130            object_type,
131            inverted_balance,
132            object_id: object.id(),
133        }
134    }
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
138pub struct OwnerIndexInfo {
139    // object_id and type of this object are a part of the key
140    pub version: SequenceNumber,
141}
142
143impl OwnerIndexInfo {
144    pub fn new(object: &Object) -> Self {
145        Self {
146            version: object.version(),
147        }
148    }
149}
150
151#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
152pub struct CoinIndexKey {
153    coin_type: StructTag,
154}
155
156#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
157pub struct BalanceKey {
158    pub owner: SuiAddress,
159    pub coin_type: StructTag,
160}
161
162#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
163pub struct CoinIndexInfo {
164    pub coin_metadata_object_id: Option<ObjectID>,
165    pub treasury_object_id: Option<ObjectID>,
166    pub regulated_coin_metadata_object_id: Option<ObjectID>,
167}
168
169#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
170pub struct BalanceIndexInfo {
171    pub coin_balance_delta: i128,
172    pub address_balance_delta: i128,
173}
174
175impl BalanceIndexInfo {
176    fn merge_delta(&mut self, other: &Self) {
177        self.coin_balance_delta = self
178            .coin_balance_delta
179            .saturating_add(other.coin_balance_delta);
180        self.address_balance_delta = self
181            .address_balance_delta
182            .saturating_add(other.address_balance_delta);
183    }
184}
185
186impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
187    fn from(index_info: BalanceIndexInfo) -> Self {
188        // Note: We represent balance deltas as i128 to simplify merging positive and negative updates.
189        // Be aware: Move doesn’t enforce a one-time-witness (OTW) pattern when creating a Supply<T>.
190        // Anyone can call `sui::balance::create_supply` and mint unbounded supply, potentially pushing
191        // total balances over u64::MAX. To avoid crashing the indexer, we clamp the merged value instead
192        // of panicking on overflow. This has the unfortunate consequence of making bugs in the index
193        // harder to detect, but is a necessary trade-off to avoid creating a DOS attack vector.
194        let coin_balance = index_info.coin_balance_delta.clamp(0, u64::MAX as i128) as u64;
195        let address_balance = index_info.address_balance_delta.clamp(0, u64::MAX as i128) as u64;
196        sui_types::storage::BalanceInfo {
197            coin_balance,
198            address_balance,
199        }
200    }
201}
202
203#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
204pub struct PackageVersionKey {
205    pub original_package_id: ObjectID,
206    pub version: u64,
207}
208
209#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
210pub struct PackageVersionInfo {
211    pub storage_id: ObjectID,
212}
213
214#[derive(Default, Clone)]
215pub struct IndexStoreOptions {
216    pub events_compaction_filter: Option<EventsCompactionFilter>,
217}
218
219fn default_table_options() -> typed_store::rocks::DBOptions {
220    typed_store::rocks::default_db_options().disable_write_throttling()
221}
222
223fn events_table_options(
224    compaction_filter: Option<EventsCompactionFilter>,
225) -> typed_store::rocks::DBOptions {
226    let mut options = default_table_options();
227    if let Some(filter) = compaction_filter {
228        options.options.set_compaction_filter(
229            "events_by_stream",
230            move |_, key, value| match filter.filter(key, value) {
231                Ok(decision) => decision,
232                Err(e) => {
233                    warn!(
234                        "Failed to parse event key during compaction: {}, key: {:?}",
235                        e, key
236                    );
237                    Decision::Remove
238                }
239            },
240        );
241    }
242    options
243}
244
245fn balance_delta_merge_operator(
246    _key: &[u8],
247    existing_val: Option<&[u8]>,
248    operands: &MergeOperands,
249) -> Option<Vec<u8>> {
250    let mut result = if let Some(existing_val) = existing_val {
251        bcs::from_bytes::<BalanceIndexInfo>(existing_val)
252            .inspect_err(|e| {
253                tracing::error!(
254                    "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
255                )
256            })
257            .ok()?
258    } else {
259        BalanceIndexInfo::default()
260    };
261
262    for operand in operands.iter() {
263        let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
264            .inspect_err(|e| {
265                tracing::error!(
266                    "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
267                )
268            })
269            .ok()?;
270        result.merge_delta(&delta);
271    }
272
273    Some(
274        bcs::to_bytes(&result)
275            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
276    )
277}
278
279fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
280    let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
281        Ok(info) => info,
282        Err(_) => return Decision::Keep,
283    };
284
285    if balance_info.coin_balance_delta == 0 && balance_info.address_balance_delta == 0 {
286        Decision::Remove
287    } else {
288        Decision::Keep
289    }
290}
291
292fn balance_table_options() -> typed_store::rocks::DBOptions {
293    default_table_options()
294        .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
295        .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
296}
297
298impl CoinIndexInfo {
299    fn merge(&mut self, other: Self) {
300        self.coin_metadata_object_id = self
301            .coin_metadata_object_id
302            .or(other.coin_metadata_object_id);
303        self.regulated_coin_metadata_object_id = self
304            .regulated_coin_metadata_object_id
305            .or(other.regulated_coin_metadata_object_id);
306        self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
307    }
308}
309
310/// RocksDB tables for the RpcIndexStore
311///
312/// Anytime a new table is added, or and existing one has it's schema changed, make sure to also
313/// update the value of `CURRENT_DB_VERSION`.
314///
315/// NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
316/// - bounded in size by the live object set
317/// - are prune-able and have corresponding logic in the `prune` function
318#[derive(DBMapUtils)]
319struct IndexStoreTables {
320    /// A singleton that store metadata information on the DB.
321    ///
322    /// A few uses for this singleton:
323    /// - determining if the DB has been initialized (as some tables will still be empty post
324    ///   initialization)
325    /// - version of the DB. Everytime a new table or schema is changed the version number needs to
326    ///   be incremented.
327    meta: DBMap<(), MetadataInfo>,
328
329    /// Table used to track watermark for the highest indexed checkpoint
330    ///
331    /// This is useful to help know the highest checkpoint that was indexed in the event that the
332    /// node was running with indexes enabled, then run for a period of time with indexes disabled,
333    /// and then run with them enabled again so that the tables can be reinitialized.
334    #[default_options_override_fn = "default_table_options"]
335    watermark: DBMap<Watermark, CheckpointSequenceNumber>,
336
337    /// An index of extra metadata for Epochs.
338    ///
339    /// Only contains entries for transactions which have yet to be pruned from the main database.
340    #[default_options_override_fn = "default_table_options"]
341    epochs: DBMap<EpochId, EpochInfo>,
342
343    /// An index of extra metadata for Transactions.
344    ///
345    /// Only contains entries for transactions which have yet to be pruned from the main database.
346    #[default_options_override_fn = "default_table_options"]
347    #[allow(unused)]
348    #[deprecated]
349    transactions: DBMap<TransactionDigest, TransactionInfo>,
350
351    /// An index of object ownership.
352    ///
353    /// Allows an efficient iterator to list all objects currently owned by a specific user
354    /// account.
355    #[default_options_override_fn = "default_table_options"]
356    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
357
358    /// An index of dynamic fields (children objects).
359    ///
360    /// Allows an efficient iterator to list all of the dynamic fields owned by a particular
361    /// ObjectID.
362    #[default_options_override_fn = "default_table_options"]
363    dynamic_field: DBMap<DynamicFieldKey, ()>,
364
365    /// An index of Coin Types
366    ///
367    /// Allows looking up information related to published Coins, like the ObjectID of its
368    /// coorisponding CoinMetadata.
369    #[default_options_override_fn = "default_table_options"]
370    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
371
372    /// An index of Balances.
373    ///
374    /// Allows looking up balances by owner address and coin type.
375    #[default_options_override_fn = "balance_table_options"]
376    balance: DBMap<BalanceKey, BalanceIndexInfo>,
377
378    /// An index of Package versions.
379    ///
380    /// Maps original package ID and version to the storage ID of that version.
381    /// Allows efficient listing of all versions of a package.
382    #[default_options_override_fn = "default_table_options"]
383    package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
384
385    /// Authenticated events index by (stream_id, checkpoint_seq, transaction_idx, event_index)
386    events_by_stream: DBMap<EventIndexKey, ()>,
387    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
388    // - bounded in size by the live object set
389    // - are prune-able and have corresponding logic in the `prune` function
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
393pub struct EventIndexKey {
394    pub stream_id: SuiAddress,
395    pub checkpoint_seq: u64,
396    /// The accumulator version that this event is settled into
397    pub accumulator_version: u64,
398    pub transaction_idx: u32,
399    pub event_index: u32,
400}
401
402/// Compaction filter for automatic pruning of old authenticated events during RocksDB compaction.
403#[derive(Clone)]
404pub struct EventsCompactionFilter {
405    pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
406}
407
408impl EventsCompactionFilter {
409    pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
410        Self { pruning_watermark }
411    }
412
413    pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
414        let event_key: EventIndexKey = bcs::from_bytes(key)?;
415        let watermark = self
416            .pruning_watermark
417            .load(std::sync::atomic::Ordering::Relaxed);
418
419        if event_key.checkpoint_seq <= watermark {
420            Ok(Decision::Remove)
421        } else {
422            Ok(Decision::Keep)
423        }
424    }
425}
426
427impl IndexStoreTables {
428    fn extract_version_if_package(
429        object: &Object,
430    ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
431        if let Data::Package(package) = &object.data {
432            let original_id = package.original_package_id();
433            let version = package.version().value();
434            let storage_id = object.id();
435
436            let key = PackageVersionKey {
437                original_package_id: original_id,
438                version,
439            };
440            let info = PackageVersionInfo { storage_id };
441            return Some((key, info));
442        }
443        None
444    }
445
446    fn open_with_index_options<P: Into<PathBuf>>(
447        path: P,
448        index_options: IndexStoreOptions,
449    ) -> Self {
450        // The typed-store derive macro only honors the per-field
451        // `default_options_override_fn` when `tables_db_options_override` is
452        // `None`. As soon as we pass `Some(map)`, any CF missing from the map
453        // silently falls back to bare `default_db_options()` — losing the
454        // `disable_write_throttling` applied by `default_table_options`. To
455        // avoid that, populate the map with `default_table_options()` for every
456        // table before overriding the few that need bespoke configuration.
457        let mut table_options = std::collections::BTreeMap::new();
458        for (table_name, _) in IndexStoreTables::describe_tables() {
459            table_options.insert(table_name, default_table_options());
460        }
461        table_options.insert("balance".to_string(), balance_table_options());
462        table_options.insert(
463            "events_by_stream".to_string(),
464            events_table_options(index_options.events_compaction_filter),
465        );
466
467        IndexStoreTables::open_tables_read_write_with_deprecation_option(
468            path.into(),
469            MetricConf::new("rpc-index"),
470            None,
471            Some(DBMapTableConfigMap::new(table_options)),
472            true, // remove deprecated tables
473        )
474    }
475
476    fn open_with_options<P: Into<PathBuf>>(
477        path: P,
478        options: typed_store::rocksdb::Options,
479        table_options: Option<DBMapTableConfigMap>,
480    ) -> Self {
481        IndexStoreTables::open_tables_read_write_with_deprecation_option(
482            path.into(),
483            MetricConf::new("rpc-index"),
484            Some(options),
485            table_options,
486            true, // remove deprecated tables
487        )
488    }
489
490    fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
491        (match self.meta.get(&()) {
492            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
493            Ok(None) => true,
494            Err(_) => true,
495        }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
496    }
497
498    // Check if the index watermark is behind the highets_executed watermark.
499    fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
500        let highest_executed_checkpint = checkpoint_store
501            .get_highest_executed_checkpoint_seq_number()
502            .ok()
503            .flatten();
504        let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
505        watermark < highest_executed_checkpint
506    }
507
508    #[tracing::instrument(skip_all)]
509    fn init(
510        &mut self,
511        authority_store: &AuthorityStore,
512        checkpoint_store: &CheckpointStore,
513        _epoch_store: &AuthorityPerEpochStore,
514        _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
515        batch_size_limit: usize,
516        rpc_config: &sui_config::RpcConfig,
517    ) -> Result<(), StorageError> {
518        info!("Initializing RPC indexes");
519
520        let highest_executed_checkpint =
521            checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
522        let lowest_available_checkpoint = checkpoint_store
523            .get_highest_pruned_checkpoint_seq_number()?
524            .map(|c| c.saturating_add(1))
525            .unwrap_or(0);
526        let lowest_available_checkpoint_objects = authority_store
527            .perpetual_tables
528            .get_highest_pruned_checkpoint()?
529            .map(|c| c.saturating_add(1))
530            .unwrap_or(0);
531        // Doing backfill requires processing objects so we have to restrict our backfill range
532        // to the range of checkpoints that we have objects for.
533        let lowest_available_checkpoint =
534            lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
535
536        let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
537            lowest_available_checkpoint..=highest_executed_checkpint
538        });
539
540        if let Some(checkpoint_range) = checkpoint_range {
541            self.index_existing_checkpoints(
542                authority_store,
543                checkpoint_store,
544                checkpoint_range,
545                rpc_config,
546            )?;
547        }
548
549        self.initialize_current_epoch(authority_store, checkpoint_store)?;
550
551        // Only index live objects if genesis checkpoint has been executed.
552        // If genesis hasn't been executed yet, the objects will be properly indexed
553        // as checkpoints are processed through the normal checkpoint execution path.
554        if highest_executed_checkpint.is_some() {
555            let coin_index = Mutex::new(HashMap::new());
556
557            let make_live_object_indexer = RpcParLiveObjectSetIndexer {
558                tables: self,
559                coin_index: &coin_index,
560                batch_size_limit,
561            };
562
563            crate::par_index_live_object_set::par_index_live_object_set(
564                authority_store,
565                &make_live_object_indexer,
566            )?;
567
568            self.coin.multi_insert(coin_index.into_inner().unwrap())?;
569        }
570
571        self.watermark.insert(
572            &Watermark::Indexed,
573            &highest_executed_checkpint.unwrap_or(0),
574        )?;
575
576        self.meta.insert(
577            &(),
578            &MetadataInfo {
579                version: CURRENT_DB_VERSION,
580            },
581        )?;
582
583        info!("Finished initializing RPC indexes");
584
585        Ok(())
586    }
587
588    #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
589    fn index_existing_checkpoints(
590        &mut self,
591        authority_store: &AuthorityStore,
592        checkpoint_store: &CheckpointStore,
593        checkpoint_range: std::ops::RangeInclusive<u64>,
594        rpc_config: &sui_config::RpcConfig,
595    ) -> Result<(), StorageError> {
596        info!(
597            "Indexing {} checkpoints in range {checkpoint_range:?}",
598            checkpoint_range.size_hint().0
599        );
600        let start_time = Instant::now();
601
602        checkpoint_range.into_par_iter().try_for_each(|seq| {
603            let load_events = rpc_config.authenticated_events_indexing();
604            let Some(checkpoint_data) = sparse_checkpoint_data_for_epoch_backfill(
605                authority_store,
606                checkpoint_store,
607                seq,
608                load_events,
609            )?
610            else {
611                return Ok(());
612            };
613
614            let mut batch = self.epochs.batch();
615
616            self.index_epoch(&checkpoint_data, &mut batch)?;
617
618            batch
619                .write_opt(bulk_ingestion_write_options())
620                .map_err(StorageError::from)
621        })?;
622
623        info!(
624            "Indexing checkpoints took {} seconds",
625            start_time.elapsed().as_secs()
626        );
627        Ok(())
628    }
629
630    /// Prune data from this Index
631    fn prune(
632        &self,
633        pruned_checkpoint_watermark: u64,
634        _checkpoint_contents_to_prune: &[CheckpointContents],
635    ) -> Result<(), TypedStoreError> {
636        let mut batch = self.watermark.batch();
637
638        batch.insert_batch(
639            &self.watermark,
640            [(Watermark::Pruned, pruned_checkpoint_watermark)],
641        )?;
642
643        batch.write()
644    }
645
646    /// Index a Checkpoint
647    fn index_checkpoint(
648        &self,
649        checkpoint: &CheckpointData,
650        _resolver: &mut dyn LayoutResolver,
651        rpc_config: &sui_config::RpcConfig,
652    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
653        debug!(
654            checkpoint = checkpoint.checkpoint_summary.sequence_number,
655            "indexing checkpoint"
656        );
657
658        let mut batch = self.owner.batch();
659
660        self.index_epoch(checkpoint, &mut batch)?;
661        self.index_transactions(
662            checkpoint,
663            &mut batch,
664            rpc_config.authenticated_events_indexing(),
665        )?;
666        self.index_objects(checkpoint, &mut batch)?;
667
668        batch.insert_batch(
669            &self.watermark,
670            [(
671                Watermark::Indexed,
672                checkpoint.checkpoint_summary.sequence_number,
673            )],
674        )?;
675
676        debug!(
677            checkpoint = checkpoint.checkpoint_summary.sequence_number,
678            "finished indexing checkpoint"
679        );
680
681        Ok(batch)
682    }
683
684    fn extract_accumulator_version(
685        &self,
686        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
687    ) -> Option<u64> {
688        let TransactionKind::ProgrammableSystemTransaction(pt) =
689            tx.transaction.transaction_data().kind()
690        else {
691            return None;
692        };
693
694        if pt.shared_input_objects().any(|obj| {
695            obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
696                && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
697        }) {
698            return tx.output_objects.iter().find_map(|obj| {
699                if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
700                    Some(obj.version().value())
701                } else {
702                    None
703                }
704            });
705        }
706
707        None
708    }
709
710    fn index_transaction_events(
711        &self,
712        tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
713        checkpoint_seq: u64,
714        tx_idx: u32,
715        accumulator_version: Option<u64>,
716        batch: &mut typed_store::rocks::DBBatch,
717    ) -> Result<(), StorageError> {
718        let acc_events = tx.effects.accumulator_events();
719        if acc_events.is_empty() {
720            return Ok(());
721        }
722
723        let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
724        for acc in acc_events {
725            if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
726                let Some(accumulator_version) = accumulator_version else {
727                    mysten_common::debug_fatal!(
728                        "Found events at checkpoint {} tx {} before any accumulator settlement",
729                        checkpoint_seq,
730                        tx_idx
731                    );
732                    continue;
733                };
734
735                if let Some(stream_id) =
736                    sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
737                {
738                    for (idx, _d) in event_digests {
739                        let key = EventIndexKey {
740                            stream_id,
741                            checkpoint_seq,
742                            accumulator_version,
743                            transaction_idx: tx_idx,
744                            event_index: *idx as u32,
745                        };
746                        entries.push((key, ()));
747                    }
748                }
749            }
750        }
751
752        if !entries.is_empty() {
753            batch.insert_batch(&self.events_by_stream, entries)?;
754        }
755        Ok(())
756    }
757
758    fn index_epoch(
759        &self,
760        checkpoint: &CheckpointData,
761        batch: &mut typed_store::rocks::DBBatch,
762    ) -> Result<(), StorageError> {
763        let Some(epoch_info) = checkpoint.epoch_info()? else {
764            return Ok(());
765        };
766        if epoch_info.epoch > 0 {
767            let prev_epoch = epoch_info.epoch - 1;
768            let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
769            current_epoch.epoch = prev_epoch; // set this incase there wasn't an entry
770            current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
771            current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
772            batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
773        }
774        batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
775        Ok(())
776    }
777
778    // After attempting to reindex past epochs, ensure that the current epoch is at least partially
779    // initalized
780    fn initialize_current_epoch(
781        &mut self,
782        authority_store: &AuthorityStore,
783        checkpoint_store: &CheckpointStore,
784    ) -> Result<(), StorageError> {
785        let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
786            return Ok(());
787        };
788
789        let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
790            .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
791
792        let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
793        epoch.epoch = checkpoint.epoch;
794
795        if epoch.protocol_version.is_none() {
796            epoch.protocol_version = Some(system_state.protocol_version());
797        }
798
799        if epoch.start_timestamp_ms.is_none() {
800            epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
801        }
802
803        if epoch.reference_gas_price.is_none() {
804            epoch.reference_gas_price = Some(system_state.reference_gas_price());
805        }
806
807        if epoch.system_state.is_none() {
808            epoch.system_state = Some(system_state);
809        }
810
811        self.epochs.insert(&epoch.epoch, &epoch)?;
812
813        Ok(())
814    }
815
816    fn index_transactions(
817        &self,
818        checkpoint: &CheckpointData,
819        batch: &mut typed_store::rocks::DBBatch,
820        index_events: bool,
821    ) -> Result<(), StorageError> {
822        let cp = checkpoint.checkpoint_summary.sequence_number;
823        let mut current_accumulator_version: Option<u64> = None;
824
825        // iterate in reverse order, process accumulator settlements first
826        for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
827            let balance_changes = sui_types::balance_change::derive_detailed_balance_changes(
828                &tx.effects,
829                &tx.input_objects,
830                &tx.output_objects,
831            )
832            .into_iter()
833            .filter_map(|change| {
834                if let TypeTag::Struct(coin_type) = change.coin_type {
835                    Some((
836                        BalanceKey {
837                            owner: change.address,
838                            coin_type: *coin_type,
839                        },
840                        BalanceIndexInfo {
841                            coin_balance_delta: change.coin_amount,
842                            address_balance_delta: change.address_amount,
843                        },
844                    ))
845                } else {
846                    None
847                }
848            });
849            batch.partial_merge_batch(&self.balance, balance_changes)?;
850
851            if index_events {
852                if let Some(version) = self.extract_accumulator_version(tx) {
853                    current_accumulator_version = Some(version);
854                }
855
856                self.index_transaction_events(
857                    tx,
858                    cp,
859                    tx_idx as u32,
860                    current_accumulator_version,
861                    batch,
862                )?;
863            }
864        }
865
866        Ok(())
867    }
868
869    fn index_objects(
870        &self,
871        checkpoint: &CheckpointData,
872        batch: &mut typed_store::rocks::DBBatch,
873    ) -> Result<(), StorageError> {
874        let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
875        let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
876
877        for tx in &checkpoint.transactions {
878            // determine changes from removed objects
879            for removed_object in tx.removed_objects_pre_version() {
880                match removed_object.owner() {
881                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
882                        let owner_key = OwnerIndexKey::from_object(removed_object);
883                        batch.delete_batch(&self.owner, [owner_key])?;
884                    }
885                    Owner::ObjectOwner(object_id) => {
886                        batch.delete_batch(
887                            &self.dynamic_field,
888                            [DynamicFieldKey::new(*object_id, removed_object.id())],
889                        )?;
890                    }
891                    Owner::Shared { .. } | Owner::Immutable => {}
892                }
893            }
894
895            // determine changes from changed objects
896            for (object, old_object) in tx.changed_objects() {
897                if let Some(old_object) = old_object {
898                    match old_object.owner() {
899                        Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
900                            let owner_key = OwnerIndexKey::from_object(old_object);
901                            batch.delete_batch(&self.owner, [owner_key])?;
902                        }
903
904                        Owner::ObjectOwner(object_id) => {
905                            if old_object.owner() != object.owner() {
906                                batch.delete_batch(
907                                    &self.dynamic_field,
908                                    [DynamicFieldKey::new(*object_id, old_object.id())],
909                                )?;
910                            }
911                        }
912
913                        Owner::Shared { .. } | Owner::Immutable => {}
914                    }
915                }
916
917                match object.owner() {
918                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
919                        let owner_key = OwnerIndexKey::from_object(object);
920                        let owner_info = OwnerIndexInfo::new(object);
921                        batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
922                    }
923                    Owner::ObjectOwner(parent) => {
924                        if should_index_dynamic_field(object) {
925                            let field_key = DynamicFieldKey::new(*parent, object.id());
926                            batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
927                        }
928                    }
929                    Owner::Shared { .. } | Owner::Immutable => {}
930                }
931                if let Some((key, info)) = Self::extract_version_if_package(object) {
932                    package_version_index.push((key, info));
933                }
934            }
935
936            // coin indexing
937            //
938            // coin indexing relies on the fact that CoinMetadata and TreasuryCap are created in
939            // the same transaction so we don't need to worry about overriding any older value
940            // that may exist in the database (because there necessarily cannot be).
941            for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
942                use std::collections::hash_map::Entry;
943
944                match coin_index.entry(key) {
945                    Entry::Occupied(mut o) => {
946                        o.get_mut().merge(value);
947                    }
948                    Entry::Vacant(v) => {
949                        v.insert(value);
950                    }
951                }
952            }
953        }
954
955        batch.insert_batch(&self.coin, coin_index)?;
956        batch.insert_batch(&self.package_version, package_version_index)?;
957
958        Ok(())
959    }
960
961    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
962        self.epochs.get(&epoch)
963    }
964
965    fn event_iter(
966        &self,
967        stream_id: SuiAddress,
968        start_checkpoint: u64,
969        start_accumulator_version: u64,
970        start_transaction_idx: u32,
971        start_event_idx: u32,
972        end_checkpoint: u64,
973        limit: u32,
974    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
975    {
976        let lower = EventIndexKey {
977            stream_id,
978            checkpoint_seq: start_checkpoint,
979            accumulator_version: start_accumulator_version,
980            transaction_idx: start_transaction_idx,
981            event_index: start_event_idx,
982        };
983        let upper = EventIndexKey {
984            stream_id,
985            checkpoint_seq: end_checkpoint,
986            accumulator_version: u64::MAX,
987            transaction_idx: u32::MAX,
988            event_index: u32::MAX,
989        };
990
991        Ok(self
992            .events_by_stream
993            .safe_iter_with_bounds(Some(lower), Some(upper))
994            .map(|res| res.map(|(k, _)| k))
995            .take(limit as usize))
996    }
997
998    fn owner_iter(
999        &self,
1000        owner: SuiAddress,
1001        object_type: Option<StructTag>,
1002        cursor: Option<OwnerIndexKey>,
1003    ) -> Result<
1004        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1005        TypedStoreError,
1006    > {
1007        // TODO can we figure out how to pass a raw byte array as a cursor?
1008        let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1009            owner,
1010            object_type: object_type
1011                .clone()
1012                .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1013            inverted_balance: None,
1014            object_id: ObjectID::ZERO,
1015        });
1016
1017        Ok(self
1018            .owner
1019            .safe_iter_with_bounds(Some(lower_bound), None)
1020            .take_while(move |item| {
1021                // If there's an error let if flow through
1022                let Ok((key, _)) = item else {
1023                    return true;
1024                };
1025
1026                // Only take if owner matches
1027                key.owner == owner
1028                    // and if an object type was supplied that the type matches
1029                    && object_type
1030                        .as_ref()
1031                        .map(|ty| {
1032                            ty.address == key.object_type.address
1033                                && ty.module == key.object_type.module
1034                                && ty.name == key.object_type.name
1035                                // If type_params are not provided then we match all params
1036                                && (ty.type_params.is_empty() ||
1037                                    // If they are provided the type params must match
1038                                    ty.type_params == key.object_type.type_params)
1039                        }).unwrap_or(true)
1040            }))
1041    }
1042
1043    fn dynamic_field_iter(
1044        &self,
1045        parent: ObjectID,
1046        cursor: Option<ObjectID>,
1047    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1048    {
1049        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1050        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1051        let iter = self
1052            .dynamic_field
1053            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1054            .map_ok(|(key, ())| key);
1055        Ok(iter)
1056    }
1057
1058    fn get_coin_info(
1059        &self,
1060        coin_type: &StructTag,
1061    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1062        let key = CoinIndexKey {
1063            coin_type: coin_type.to_owned(),
1064        };
1065        self.coin.get(&key)
1066    }
1067
1068    fn get_balance(
1069        &self,
1070        owner: &SuiAddress,
1071        coin_type: &StructTag,
1072    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1073        let key = BalanceKey {
1074            owner: owner.to_owned(),
1075            coin_type: coin_type.to_owned(),
1076        };
1077        self.balance.get(&key)
1078    }
1079
1080    fn balance_iter(
1081        &self,
1082        owner: SuiAddress,
1083        cursor: Option<BalanceKey>,
1084    ) -> Result<
1085        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1086        TypedStoreError,
1087    > {
1088        let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1089            owner,
1090            coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1091        });
1092
1093        Ok(self
1094            .balance
1095            .safe_iter_with_bounds(Some(lower_bound), None)
1096            .scan((), move |_, item| {
1097                match item {
1098                    Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1099                    Ok(_) => None,          // Different owner, stop iteration
1100                    Err(e) => Some(Err(e)), // Propagate error
1101                }
1102            }))
1103    }
1104
1105    fn package_versions_iter(
1106        &self,
1107        original_id: ObjectID,
1108        cursor: Option<u64>,
1109    ) -> Result<
1110        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1111        TypedStoreError,
1112    > {
1113        let lower_bound = PackageVersionKey {
1114            original_package_id: original_id,
1115            version: cursor.unwrap_or(0),
1116        };
1117        let upper_bound = PackageVersionKey {
1118            original_package_id: original_id,
1119            version: u64::MAX,
1120        };
1121
1122        Ok(self
1123            .package_version
1124            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1125    }
1126}
1127
1128pub struct RpcIndexStore {
1129    tables: IndexStoreTables,
1130    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1131    rpc_config: sui_config::RpcConfig,
1132}
1133
1134impl RpcIndexStore {
1135    /// Given the provided directory, construct the path to the db
1136    fn db_path(dir: &Path) -> PathBuf {
1137        dir.join("rpc-index")
1138    }
1139
1140    pub async fn new(
1141        dir: &Path,
1142        authority_store: &AuthorityStore,
1143        checkpoint_store: &CheckpointStore,
1144        epoch_store: &AuthorityPerEpochStore,
1145        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1146        pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1147        rpc_config: sui_config::RpcConfig,
1148    ) -> Self {
1149        let events_filter = EventsCompactionFilter::new(pruning_watermark);
1150        let index_options = IndexStoreOptions {
1151            events_compaction_filter: Some(events_filter),
1152        };
1153
1154        Self::new_with_options(
1155            dir,
1156            authority_store,
1157            checkpoint_store,
1158            epoch_store,
1159            package_store,
1160            index_options,
1161            rpc_config,
1162        )
1163        .await
1164    }
1165
1166    pub async fn new_with_options(
1167        dir: &Path,
1168        authority_store: &AuthorityStore,
1169        checkpoint_store: &CheckpointStore,
1170        epoch_store: &AuthorityPerEpochStore,
1171        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1172        index_options: IndexStoreOptions,
1173        rpc_config: sui_config::RpcConfig,
1174    ) -> Self {
1175        let path = Self::db_path(dir);
1176        let index_config = rpc_config.index_initialization_config();
1177
1178        let tables = {
1179            let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1180
1181            // If the index tables are uninitialized or on an older version then we need to
1182            // populate them
1183            if tables.needs_to_do_initialization(checkpoint_store) {
1184                let batch_size_limit;
1185
1186                let mut tables = {
1187                    drop(tables);
1188                    typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1189                        .await
1190                        .expect("unable to destroy old rpc-index db");
1191
1192                    // Open the empty DB with `unordered_write`s enabled in order to get a ~3x
1193                    // speedup when indexing
1194                    let mut options = typed_store::rocksdb::Options::default();
1195                    options.set_unordered_write(true);
1196
1197                    // Allow CPU-intensive flushing operations to use all CPUs.
1198                    let max_background_jobs = if let Some(jobs) =
1199                        index_config.as_ref().and_then(|c| c.max_background_jobs)
1200                    {
1201                        debug!("Using config override for max_background_jobs: {}", jobs);
1202                        jobs
1203                    } else {
1204                        let jobs = num_cpus::get() as i32;
1205                        debug!(
1206                            "Calculated max_background_jobs: {} (based on CPU count)",
1207                            jobs
1208                        );
1209                        jobs
1210                    };
1211                    options.set_max_background_jobs(max_background_jobs);
1212
1213                    // We are disabling compaction for all column families below. This means we can
1214                    // also disable the backpressure that slows down writes when the number of L0
1215                    // files builds up since we will never compact them anyway.
1216                    options.set_level_zero_file_num_compaction_trigger(0);
1217                    options.set_level_zero_slowdown_writes_trigger(-1);
1218                    options.set_level_zero_stop_writes_trigger(i32::MAX);
1219
1220                    let total_memory_bytes = get_available_memory();
1221                    // This is an upper bound on the amount to of ram the memtables can use across
1222                    // all column families.
1223                    let db_buffer_size = if let Some(size) =
1224                        index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1225                    {
1226                        debug!(
1227                            "Using config override for db_write_buffer_size: {} bytes",
1228                            size
1229                        );
1230                        size
1231                    } else {
1232                        // Default to 80% of system RAM
1233                        let size = (total_memory_bytes as f64 * 0.8) as usize;
1234                        debug!(
1235                            "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1236                            size, total_memory_bytes
1237                        );
1238                        size
1239                    };
1240                    options.set_db_write_buffer_size(db_buffer_size);
1241
1242                    // Create column family specific options.
1243                    let mut table_config_map = BTreeMap::new();
1244
1245                    // Create options with compactions disabled and large write buffers.
1246                    // Each CF can use up to 25% of system RAM, but total is still limited by
1247                    // set_db_write_buffer_size configured above.
1248                    let mut cf_options = typed_store::rocks::default_db_options();
1249                    cf_options.options.set_disable_auto_compactions(true);
1250
1251                    let (buffer_size, buffer_count) = match (
1252                        index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1253                        index_config
1254                            .as_ref()
1255                            .and_then(|c| c.cf_max_write_buffer_number),
1256                    ) {
1257                        (Some(size), Some(count)) => {
1258                            debug!(
1259                                "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1260                                size, count
1261                            );
1262                            (size, count)
1263                        }
1264                        (None, None) => {
1265                            // Calculate buffer configuration: 25% of RAM split across buffers
1266                            let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1267                            debug!(
1268                                "Column family memory budget: {} bytes (25% of {} total bytes)",
1269                                cf_memory_budget, total_memory_bytes
1270                            );
1271                            const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB minimum
1272
1273                            // Target number of buffers based on CPU count
1274                            // More CPUs = more parallel flushing capability
1275                            let target_buffer_count = num_cpus::get().max(2);
1276
1277                            // Aim for CPU-based buffer count, but reduce if it would make buffers too small
1278                            //   For example:
1279                            // - 128GB RAM, 32 CPUs: 32GB per CF / 32 buffers = 1GB each
1280                            // - 16GB RAM, 8 CPUs: 4GB per CF / 8 buffers = 512MB each
1281                            // - 4GB RAM, 8 CPUs: 1GB per CF / 64MB min = ~16 buffers of 64MB each
1282                            let buffer_size =
1283                                (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1284                            let buffer_count = (cf_memory_budget / buffer_size)
1285                                .clamp(2, target_buffer_count)
1286                                as i32;
1287                            debug!(
1288                                "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1289                                buffer_size, buffer_count, target_buffer_count
1290                            );
1291                            (buffer_size, buffer_count)
1292                        }
1293                        _ => {
1294                            panic!(
1295                                "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1296                            );
1297                        }
1298                    };
1299
1300                    cf_options.options.set_write_buffer_size(buffer_size);
1301                    cf_options.options.set_max_write_buffer_number(buffer_count);
1302
1303                    // Calculate batch size limit: default to half the buffer size or 128MB, whichever is smaller
1304                    batch_size_limit = if let Some(limit) =
1305                        index_config.as_ref().and_then(|c| c.batch_size_limit)
1306                    {
1307                        debug!(
1308                            "Using config override for batch_size_limit: {} bytes",
1309                            limit
1310                        );
1311                        limit
1312                    } else {
1313                        let half_buffer = buffer_size / 2;
1314                        let default_limit = 1 << 27; // 128MB
1315                        let limit = half_buffer.min(default_limit);
1316                        debug!(
1317                            "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1318                            limit, half_buffer, default_limit
1319                        );
1320                        limit
1321                    };
1322
1323                    // Apply cf_options to all tables
1324                    for (table_name, _) in IndexStoreTables::describe_tables() {
1325                        table_config_map.insert(table_name, cf_options.clone());
1326                    }
1327
1328                    // Override Balance options with the merge operator
1329                    let mut balance_options = cf_options.clone();
1330                    balance_options = balance_options.set_merge_operator_associative(
1331                        "balance_merge",
1332                        balance_delta_merge_operator,
1333                    );
1334                    table_config_map.insert("balance".to_string(), balance_options);
1335
1336                    table_config_map.insert(
1337                        "events_by_stream".to_string(),
1338                        events_table_options(index_options.events_compaction_filter.clone()),
1339                    );
1340
1341                    IndexStoreTables::open_with_options(
1342                        &path,
1343                        options,
1344                        Some(DBMapTableConfigMap::new(table_config_map)),
1345                    )
1346                };
1347
1348                tables
1349                    .init(
1350                        authority_store,
1351                        checkpoint_store,
1352                        epoch_store,
1353                        package_store,
1354                        batch_size_limit,
1355                        &rpc_config,
1356                    )
1357                    .expect("unable to initialize rpc index from live object set");
1358
1359                // Flush all data to disk before dropping tables.
1360                // This is critical because WAL is disabled during bulk indexing.
1361                // Note we only need to call flush on one table because all tables share the same
1362                // underlying database.
1363                tables
1364                    .meta
1365                    .flush()
1366                    .expect("Failed to flush RPC index tables to disk");
1367
1368                let weak_db = Arc::downgrade(&tables.meta.db);
1369                drop(tables);
1370
1371                let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1372                loop {
1373                    if weak_db.strong_count() == 0 {
1374                        break;
1375                    }
1376                    if std::time::Instant::now() > deadline {
1377                        panic!("unable to reopen DB after indexing");
1378                    }
1379                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1380                }
1381
1382                // Reopen the DB with default options (eg without `unordered_write`s enabled)
1383                let reopened_tables =
1384                    IndexStoreTables::open_with_index_options(&path, index_options);
1385
1386                // Sanity check: verify the database version was persisted correctly
1387                let stored_version = reopened_tables
1388                    .meta
1389                    .get(&())
1390                    .expect("Failed to read metadata from reopened database")
1391                    .expect("Metadata not found in reopened database");
1392                assert_eq!(
1393                    stored_version.version, CURRENT_DB_VERSION,
1394                    "Database version mismatch after flush and reopen: expected {}, found {}",
1395                    CURRENT_DB_VERSION, stored_version.version
1396                );
1397
1398                reopened_tables
1399            } else {
1400                tables
1401            }
1402        };
1403
1404        Self {
1405            tables,
1406            pending_updates: Default::default(),
1407            rpc_config,
1408        }
1409    }
1410
1411    pub fn new_without_init(dir: &Path) -> Self {
1412        let path = Self::db_path(dir);
1413        let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1414
1415        Self {
1416            tables,
1417            pending_updates: Default::default(),
1418            rpc_config: sui_config::RpcConfig::default(),
1419        }
1420    }
1421
1422    pub fn prune(
1423        &self,
1424        pruned_checkpoint_watermark: u64,
1425        checkpoint_contents_to_prune: &[CheckpointContents],
1426    ) -> Result<(), TypedStoreError> {
1427        self.tables
1428            .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1429    }
1430
1431    /// Index a checkpoint and stage the index updated in `pending_updates`.
1432    ///
1433    /// Updates will not be committed to the database until `commit_update_for_checkpoint` is
1434    /// called.
1435    #[tracing::instrument(
1436        skip_all,
1437        fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1438    )]
1439    pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1440        let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1441        let batch = self
1442            .tables
1443            .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1444            .expect("db error");
1445
1446        self.pending_updates
1447            .lock()
1448            .unwrap()
1449            .insert(sequence_number, batch);
1450    }
1451
1452    /// Commits the pending updates for the provided checkpoint number.
1453    ///
1454    /// Invariants:
1455    /// - `index_checkpoint` must have been called for the provided checkpoint
1456    /// - Callers of this function must ensure that it is called for each checkpoint in sequential
1457    ///   order. This will panic if the provided checkpoint does not match the expected next
1458    ///   checkpoint to commit.
1459    #[tracing::instrument(skip(self))]
1460    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1461        let next_batch = self.pending_updates.lock().unwrap().pop_first();
1462
1463        // Its expected that the next batch exists
1464        let (next_sequence_number, batch) = next_batch.unwrap();
1465        assert_eq!(
1466            checkpoint, next_sequence_number,
1467            "commit_update_for_checkpoint must be called in order"
1468        );
1469
1470        Ok(batch.write()?)
1471    }
1472
1473    pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1474        self.tables.get_epoch_info(epoch)
1475    }
1476
1477    pub fn owner_iter(
1478        &self,
1479        owner: SuiAddress,
1480        object_type: Option<StructTag>,
1481        cursor: Option<OwnerIndexKey>,
1482    ) -> Result<
1483        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1484        TypedStoreError,
1485    > {
1486        self.tables.owner_iter(owner, object_type, cursor)
1487    }
1488
1489    pub fn dynamic_field_iter(
1490        &self,
1491        parent: ObjectID,
1492        cursor: Option<ObjectID>,
1493    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1494    {
1495        self.tables.dynamic_field_iter(parent, cursor)
1496    }
1497
1498    pub fn get_coin_info(
1499        &self,
1500        coin_type: &StructTag,
1501    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1502        self.tables.get_coin_info(coin_type)
1503    }
1504
1505    pub fn get_balance(
1506        &self,
1507        owner: &SuiAddress,
1508        coin_type: &StructTag,
1509    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1510        self.tables.get_balance(owner, coin_type)
1511    }
1512
1513    pub fn balance_iter(
1514        &self,
1515        owner: SuiAddress,
1516        cursor: Option<BalanceKey>,
1517    ) -> Result<
1518        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1519        TypedStoreError,
1520    > {
1521        self.tables.balance_iter(owner, cursor)
1522    }
1523
1524    pub fn package_versions_iter(
1525        &self,
1526        original_id: ObjectID,
1527        cursor: Option<u64>,
1528    ) -> Result<
1529        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1530        TypedStoreError,
1531    > {
1532        self.tables.package_versions_iter(original_id, cursor)
1533    }
1534
1535    pub fn event_iter(
1536        &self,
1537        stream_id: SuiAddress,
1538        start_checkpoint: u64,
1539        start_accumulator_version: u64,
1540        start_transaction_idx: u32,
1541        start_event_idx: u32,
1542        end_checkpoint: u64,
1543        limit: u32,
1544    ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1545    {
1546        self.tables.event_iter(
1547            stream_id,
1548            start_checkpoint,
1549            start_accumulator_version,
1550            start_transaction_idx,
1551            start_event_idx,
1552            end_checkpoint,
1553            limit,
1554        )
1555    }
1556
1557    pub fn get_highest_indexed_checkpoint_seq_number(
1558        &self,
1559    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1560        self.tables.watermark.get(&Watermark::Indexed)
1561    }
1562}
1563
1564fn should_index_dynamic_field(object: &Object) -> bool {
1565    // Skip any objects that aren't of type `Field<Name, Value>`
1566    //
1567    // All dynamic fields are of type:
1568    //   - Field<Name, Value> for dynamic fields
1569    //   - Field<Wrapper<Name>, ID>> for dynamic field objects where the ID is the id of the pointed
1570    //   to object
1571    //
1572    object
1573        .data
1574        .try_as_move()
1575        .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1576}
1577
1578fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1579    use sui_types::coin::CoinMetadata;
1580    use sui_types::coin::RegulatedCoinMetadata;
1581    use sui_types::coin::TreasuryCap;
1582
1583    let object_type = object.type_().and_then(MoveObjectType::other)?;
1584
1585    if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1586        return Some((
1587            CoinIndexKey { coin_type },
1588            CoinIndexInfo {
1589                coin_metadata_object_id: Some(object.id()),
1590                treasury_object_id: None,
1591                regulated_coin_metadata_object_id: None,
1592            },
1593        ));
1594    }
1595
1596    if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1597        return Some((
1598            CoinIndexKey { coin_type },
1599            CoinIndexInfo {
1600                coin_metadata_object_id: None,
1601                treasury_object_id: Some(object.id()),
1602                regulated_coin_metadata_object_id: None,
1603            },
1604        ));
1605    }
1606
1607    if let Some(coin_type) =
1608        RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1609    {
1610        return Some((
1611            CoinIndexKey { coin_type },
1612            CoinIndexInfo {
1613                coin_metadata_object_id: None,
1614                treasury_object_id: None,
1615                regulated_coin_metadata_object_id: Some(object.id()),
1616            },
1617        ));
1618    }
1619
1620    None
1621}
1622
1623struct RpcParLiveObjectSetIndexer<'a> {
1624    tables: &'a IndexStoreTables,
1625    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1626    batch_size_limit: usize,
1627}
1628
1629struct RpcLiveObjectIndexer<'a> {
1630    tables: &'a IndexStoreTables,
1631    batch: typed_store::rocks::DBBatch,
1632    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1633    balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1634    batch_size_limit: usize,
1635}
1636
1637impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1638    type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1639
1640    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1641        RpcLiveObjectIndexer {
1642            tables: self.tables,
1643            batch: self.tables.owner.batch(),
1644            coin_index: self.coin_index,
1645            balance_changes: HashMap::new(),
1646            batch_size_limit: self.batch_size_limit,
1647        }
1648    }
1649}
1650
1651impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1652    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1653        match object.owner {
1654            // Owner Index
1655            Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1656                let owner_key = OwnerIndexKey::from_object(&object);
1657                let owner_info = OwnerIndexInfo::new(&object);
1658                self.batch
1659                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1660
1661                if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1662                    let balance_key = BalanceKey { owner, coin_type };
1663                    let balance_info = BalanceIndexInfo {
1664                        coin_balance_delta: value.into(),
1665                        address_balance_delta: 0,
1666                    };
1667                    self.balance_changes
1668                        .entry(balance_key)
1669                        .or_default()
1670                        .merge_delta(&balance_info);
1671
1672                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1673                        self.batch.partial_merge_batch(
1674                            &self.tables.balance,
1675                            std::mem::take(&mut self.balance_changes),
1676                        )?;
1677                    }
1678                }
1679            }
1680
1681            // Dynamic Field Index
1682            Owner::ObjectOwner(parent) => {
1683                if should_index_dynamic_field(&object) {
1684                    let field_key = DynamicFieldKey::new(parent, object.id());
1685                    self.batch
1686                        .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1687                }
1688
1689                // Index address balances
1690                if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
1691                    && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
1692                {
1693                    let balance_key = BalanceKey { owner, coin_type };
1694                    let balance_info = BalanceIndexInfo {
1695                        coin_balance_delta: 0,
1696                        address_balance_delta: balance,
1697                    };
1698                    self.balance_changes
1699                        .entry(balance_key)
1700                        .or_default()
1701                        .merge_delta(&balance_info);
1702
1703                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1704                        self.batch.partial_merge_batch(
1705                            &self.tables.balance,
1706                            std::mem::take(&mut self.balance_changes),
1707                        )?;
1708                    }
1709                }
1710            }
1711
1712            Owner::Shared { .. } | Owner::Immutable => {}
1713        }
1714
1715        // Look for CoinMetadata<T> and TreasuryCap<T> objects
1716        if let Some((key, value)) = try_create_coin_index_info(&object) {
1717            use std::collections::hash_map::Entry;
1718
1719            match self.coin_index.lock().unwrap().entry(key) {
1720                Entry::Occupied(mut o) => {
1721                    o.get_mut().merge(value);
1722                }
1723                Entry::Vacant(v) => {
1724                    v.insert(value);
1725                }
1726            }
1727        }
1728
1729        if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1730            self.batch
1731                .insert_batch(&self.tables.package_version, [(key, info)])?;
1732        }
1733
1734        // If the batch size grows to greater than the limit then write out to the DB so that the
1735        // data we need to hold in memory doesn't grow unbounded.
1736        if self.batch.size_in_bytes() >= self.batch_size_limit {
1737            std::mem::replace(&mut self.batch, self.tables.owner.batch())
1738                .write_opt(bulk_ingestion_write_options())?;
1739        }
1740
1741        Ok(())
1742    }
1743
1744    fn finish(mut self) -> Result<(), StorageError> {
1745        self.batch.partial_merge_batch(
1746            &self.tables.balance,
1747            std::mem::take(&mut self.balance_changes),
1748        )?;
1749        self.batch.write_opt(bulk_ingestion_write_options())?;
1750        Ok(())
1751    }
1752}
1753
1754// TODO figure out a way to dedup this logic. Today we'd need to do quite a bit of refactoring to
1755// make it possible.
1756fn sparse_checkpoint_data_for_epoch_backfill(
1757    authority_store: &AuthorityStore,
1758    checkpoint_store: &CheckpointStore,
1759    checkpoint: u64,
1760    load_events: bool,
1761) -> Result<Option<CheckpointData>, StorageError> {
1762    use sui_types::full_checkpoint_content::CheckpointTransaction;
1763
1764    let summary = checkpoint_store
1765        .get_checkpoint_by_sequence_number(checkpoint)?
1766        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1767
1768    // Only load genesis and end of epoch checkpoints
1769    if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
1770        return Ok(None);
1771    }
1772
1773    let contents = checkpoint_store
1774        .get_checkpoint_contents(&summary.content_digest)?
1775        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1776
1777    let transaction_digests = contents
1778        .iter()
1779        .map(|execution_digests| execution_digests.transaction)
1780        .collect::<Vec<_>>();
1781    let transactions = authority_store
1782        .multi_get_transaction_blocks(&transaction_digests)?
1783        .into_iter()
1784        .map(|maybe_transaction| {
1785            maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1786        })
1787        .collect::<Result<Vec<_>, _>>()?;
1788
1789    let effects = authority_store
1790        .multi_get_executed_effects(&transaction_digests)?
1791        .into_iter()
1792        .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1793        .collect::<Result<Vec<_>, _>>()?;
1794
1795    let events = if load_events {
1796        authority_store
1797            .multi_get_events(&transaction_digests)
1798            .map_err(|e| StorageError::custom(e.to_string()))?
1799    } else {
1800        vec![None; transaction_digests.len()]
1801    };
1802
1803    let mut full_transactions = Vec::with_capacity(transactions.len());
1804    for ((tx, fx), ev) in transactions
1805        .into_iter()
1806        .zip_debug_eq(effects)
1807        .zip_debug_eq(events)
1808    {
1809        let input_objects =
1810            sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1811        let output_objects =
1812            sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1813
1814        let full_transaction = CheckpointTransaction {
1815            transaction: tx.into(),
1816            effects: fx,
1817            events: ev,
1818            input_objects,
1819            output_objects,
1820        };
1821
1822        full_transactions.push(full_transaction);
1823    }
1824
1825    let checkpoint_data = CheckpointData {
1826        checkpoint_summary: summary.into(),
1827        checkpoint_contents: contents,
1828        transactions: full_transactions,
1829    };
1830
1831    Ok(Some(checkpoint_data))
1832}
1833
1834fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1835    match Coin::extract_balance_if_coin(object) {
1836        Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1837        Ok(Some(_)) => {
1838            debug!("Coin object {} has non-struct type tag", object.id());
1839            Ok(None)
1840        }
1841        Ok(None) => {
1842            // Not a coin
1843            Ok(None)
1844        }
1845        Err(e) => {
1846            // Corrupted coin data
1847            Err(StorageError::custom(format!(
1848                "Failed to deserialize coin object {}: {}",
1849                object.id(),
1850                e
1851            )))
1852        }
1853    }
1854}
1855
1856fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
1857    let move_object = object.data.try_as_move()?;
1858
1859    let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
1860    else {
1861        return None;
1862    };
1863
1864    let (key, value): (
1865        sui_types::accumulator_root::AccumulatorKey,
1866        sui_types::accumulator_root::AccumulatorValue,
1867    ) = move_object.try_into().ok()?;
1868
1869    let balance = value.as_u128()? as i128;
1870    if balance <= 0 {
1871        return None;
1872    }
1873
1874    Some((key.owner, *coin_type, balance))
1875}
1876
1877#[cfg(test)]
1878mod tests {
1879    use super::*;
1880    use std::sync::atomic::AtomicU64;
1881    use sui_types::base_types::SuiAddress;
1882
1883    #[tokio::test]
1884    async fn test_events_compaction_filter() {
1885        let temp_dir = tempfile::tempdir().unwrap();
1886        let path = temp_dir.path();
1887        let db_path = path.join("rpc-index");
1888
1889        let pruning_watermark = Arc::new(AtomicU64::new(5));
1890        let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1891
1892        let index_options = IndexStoreOptions {
1893            events_compaction_filter: Some(compaction_filter),
1894        };
1895
1896        let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1897        let stream_id = SuiAddress::random_for_testing_only();
1898        let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1899            .iter()
1900            .map(|&checkpoint_seq| EventIndexKey {
1901                stream_id,
1902                checkpoint_seq,
1903                accumulator_version: 0,
1904                transaction_idx: 0,
1905                event_index: 0,
1906            })
1907            .collect();
1908
1909        let mut batch = tables.events_by_stream.batch();
1910        for key in &test_events {
1911            batch
1912                .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1913                .unwrap();
1914        }
1915        batch.write().unwrap();
1916
1917        tables.events_by_stream.flush().unwrap();
1918        let mut events_before_compaction = 0;
1919        for result in tables.events_by_stream.safe_iter() {
1920            if result.is_ok() {
1921                events_before_compaction += 1;
1922            }
1923        }
1924        assert_eq!(
1925            events_before_compaction, 5,
1926            "Should have 5 events before compaction"
1927        );
1928        let start_key = EventIndexKey {
1929            stream_id: SuiAddress::ZERO,
1930            checkpoint_seq: 0,
1931            accumulator_version: 0,
1932            transaction_idx: 0,
1933            event_index: 0,
1934        };
1935        let end_key = EventIndexKey {
1936            stream_id: SuiAddress::random_for_testing_only(),
1937            checkpoint_seq: u64::MAX,
1938            accumulator_version: u64::MAX,
1939            transaction_idx: u32::MAX,
1940            event_index: u32::MAX,
1941        };
1942
1943        tables
1944            .events_by_stream
1945            .compact_range(&start_key, &end_key)
1946            .unwrap();
1947        let mut events_after_compaction = Vec::new();
1948        for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1949            events_after_compaction.push(key);
1950        }
1951
1952        println!("Events after compaction: {}", events_after_compaction.len());
1953        assert!(
1954            events_after_compaction.len() >= 2,
1955            "Should have at least the events that shouldn't be pruned"
1956        );
1957        pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1958        let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1959        assert_eq!(watermark_after, 20, "Watermark should be updated");
1960    }
1961
1962    #[test]
1963    fn test_events_compaction_filter_logic() {
1964        let watermark = Arc::new(AtomicU64::new(100));
1965        let filter = EventsCompactionFilter::new(watermark.clone());
1966
1967        let old_key = EventIndexKey {
1968            stream_id: SuiAddress::random_for_testing_only(),
1969            checkpoint_seq: 50,
1970            accumulator_version: 0,
1971            transaction_idx: 0,
1972            event_index: 0,
1973        };
1974        let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1975        let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1976        assert!(
1977            matches!(decision, Decision::Remove),
1978            "Event with checkpoint 50 should be removed when watermark is 100"
1979        );
1980        let new_key = EventIndexKey {
1981            stream_id: SuiAddress::random_for_testing_only(),
1982            checkpoint_seq: 150,
1983            accumulator_version: 0,
1984            transaction_idx: 0,
1985            event_index: 0,
1986        };
1987        let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1988        let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1989        assert!(
1990            matches!(decision, Decision::Keep),
1991            "Event with checkpoint 150 should be kept when watermark is 100"
1992        );
1993        let boundary_key = EventIndexKey {
1994            stream_id: SuiAddress::random_for_testing_only(),
1995            checkpoint_seq: 100,
1996            accumulator_version: 0,
1997            transaction_idx: 0,
1998            event_index: 0,
1999        };
2000        let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
2001        let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
2002        assert!(
2003            matches!(decision, Decision::Remove),
2004            "Event with checkpoint equal to watermark should be removed"
2005        );
2006    }
2007
2008    /// Every column family opened via `open_with_index_options` must have the
2009    /// `disable_write_throttling` override applied. The typed-store derive
2010    /// macro silently falls back to bare `default_db_options()` for any CF
2011    /// missing from `tables_db_options_override`, reverting to RocksDB's
2012    /// default stall triggers (slowdown=20, stop=36) — small enough that ~80
2013    /// L0 files would stop writes entirely. RocksDB persists the effective
2014    /// per-CF options to an `OPTIONS-NNNNNN` file at open; parse it to verify
2015    /// every CF received the override.
2016    #[tokio::test]
2017    async fn open_with_index_options_overrides_every_cf() {
2018        let temp_dir = tempfile::tempdir().unwrap();
2019        let db_path = temp_dir.path().join("rpc-index");
2020
2021        let _tables =
2022            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2023
2024        // Iterate the CFs RocksDB actually wrote to the OPTIONS file rather
2025        // than the schema-declared set, since deprecated CFs are dropped at
2026        // open time and never appear in OPTIONS. RocksDB always writes a
2027        // `default` CF; we exclude it because typed-store doesn't store data
2028        // there and we don't configure it.
2029        let per_cf = parse_cf_options(&db_path);
2030        assert!(
2031            !per_cf.is_empty(),
2032            "expected at least one CFOptions section in OPTIONS file"
2033        );
2034        for (cf_name, opts) in &per_cf {
2035            if cf_name == "default" {
2036                continue;
2037            }
2038            for (key, expected) in [
2039                ("level0_slowdown_writes_trigger", "512"),
2040                ("level0_stop_writes_trigger", "1024"),
2041                ("soft_pending_compaction_bytes_limit", "0"),
2042                ("hard_pending_compaction_bytes_limit", "0"),
2043            ] {
2044                let actual = opts
2045                    .get(key)
2046                    .unwrap_or_else(|| panic!("cf `{cf_name}` missing `{key}`"));
2047                assert_eq!(
2048                    actual, expected,
2049                    "cf `{cf_name}` has `{key}={actual}`, expected `{expected}` — \
2050                     the typed-store override map likely doesn't cover this CF"
2051                );
2052            }
2053        }
2054    }
2055
2056    /// Parse the newest `OPTIONS-NNNNNN` file in `db_path` into a map keyed by
2057    /// column family name. RocksDB writes one such file on every open with a
2058    /// section per CF in INI-like format.
2059    fn parse_cf_options(db_path: &Path) -> HashMap<String, HashMap<String, String>> {
2060        let mut options_file: Option<(u64, PathBuf)> = None;
2061        for entry in std::fs::read_dir(db_path).expect("read_dir failed") {
2062            let entry = entry.unwrap();
2063            let name = entry.file_name().to_string_lossy().into_owned();
2064            let Some(rest) = name.strip_prefix("OPTIONS-") else {
2065                continue;
2066            };
2067            // Skip transient files like `OPTIONS-NNNNNN.dbtmp`.
2068            let Ok(seq) = rest.parse::<u64>() else {
2069                continue;
2070            };
2071            if options_file.as_ref().is_none_or(|(s, _)| seq > *s) {
2072                options_file = Some((seq, entry.path()));
2073            }
2074        }
2075        let (_, path) = options_file.expect("no OPTIONS-* file written");
2076        let content = std::fs::read_to_string(&path).expect("read OPTIONS failed");
2077
2078        let mut result: HashMap<String, HashMap<String, String>> = HashMap::new();
2079        let mut current_cf: Option<String> = None;
2080        for line in content.lines() {
2081            let line = line.trim();
2082            if let Some(rest) = line.strip_prefix("[CFOptions \"") {
2083                let cf_name = rest.trim_end_matches("\"]").to_string();
2084                current_cf = Some(cf_name);
2085            } else if line.starts_with('[') {
2086                // Any other section ends the CFOptions block.
2087                current_cf = None;
2088            } else if let Some(cf) = current_cf.as_ref()
2089                && let Some((k, v)) = line.split_once('=')
2090            {
2091                result
2092                    .entry(cf.clone())
2093                    .or_default()
2094                    .insert(k.trim().to_string(), v.trim().to_string());
2095            }
2096        }
2097        result
2098    }
2099}