sui_core/
rpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use roaring::RoaringBitmap;
15use rustc_hash::{FxHashMap, FxHashSet};
16use serde::Deserialize;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use std::collections::{BTreeMap, HashMap};
20use std::path::Path;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::sync::Mutex;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use std::time::Duration;
27use std::time::Instant;
28use sui_inverted_index::encode_dimension_key;
29use sui_inverted_index::for_each_event_dimension;
30use sui_inverted_index::for_each_transaction_dimension;
31use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
32use sui_types::base_types::MoveObjectType;
33use sui_types::base_types::ObjectID;
34use sui_types::base_types::SequenceNumber;
35use sui_types::base_types::SuiAddress;
36use sui_types::coin::Coin;
37use sui_types::committee::EpochId;
38use sui_types::digests::TransactionDigest;
39use sui_types::effects::TransactionEffectsAPI;
40use sui_types::full_checkpoint_content::Checkpoint;
41use sui_types::full_checkpoint_content::ExecutedTransaction;
42use sui_types::full_checkpoint_content::ObjectSet;
43use sui_types::messages_checkpoint::CheckpointSequenceNumber;
44use sui_types::object::Data;
45use sui_types::object::Object;
46use sui_types::object::Owner;
47use sui_types::storage::BackingPackageStore;
48use sui_types::storage::DynamicFieldKey;
49use sui_types::storage::EpochInfo;
50use sui_types::storage::LedgerBitmapBucket;
51use sui_types::storage::LedgerBitmapBucketIterator;
52use sui_types::storage::LedgerTxSeqDigest;
53use sui_types::storage::LedgerTxSeqDigestIterator;
54use sui_types::storage::ObjectKey;
55use sui_types::storage::TransactionInfo;
56use sui_types::storage::error::Error as StorageError;
57use sui_types::sui_system_state::SuiSystemStateTrait;
58use sui_types::transaction::TransactionDataAPI;
59use sysinfo::{MemoryRefreshKind, RefreshKind, System};
60use tracing::{debug, info, warn};
61use typed_store::DBMapUtils;
62use typed_store::TypedStoreError;
63use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
64use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
65use typed_store::traits::Map;
66
67const CURRENT_DB_VERSION: u64 = 4;
68
69// I tried increasing this to 100k and 1M and it didn't speed up indexing at all.
70const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
71
72// Bitmap inverted index constants
73// A change to these constants requires bumping CURRENT_DB_VERSION
74const TX_BUCKET_SIZE: u64 = 65_536;
75// 2^28: 4,096 transactions per bucket. Must match sui-kvstore's
76// `event_bitmap_index::BUCKET_SIZE` and the reader's `EVENT_BITMAP_BUCKET_SIZE`.
77const EVENT_BUCKET_SIZE: u64 = 268_435_456;
78const EVENT_BITS: u32 = 16;
79const MAX_EVENTS_PER_TX: u32 = 1 << EVENT_BITS;
80const MAX_TX_SEQ: u64 = u64::MAX >> EVENT_BITS;
81
82const _: () = assert!(TX_BUCKET_SIZE <= u32::MAX as u64);
83const _: () = assert!(EVENT_BUCKET_SIZE <= u32::MAX as u64);
84const _: () = assert!(EVENT_BITS < u32::BITS);
85const _: () = assert!(EVENT_BITS < u64::BITS);
86const _: () = assert!(MAX_EVENTS_PER_TX as u64 == 1u64 << EVENT_BITS);
87const _: () = assert!(EVENT_BUCKET_SIZE.is_multiple_of(MAX_EVENTS_PER_TX as u64));
88
89fn checked_encode_event_seq(tx_seq: u64, event_idx: u32) -> Result<u64, StorageError> {
90    if event_idx >= MAX_EVENTS_PER_TX {
91        return Err(StorageError::custom(format!(
92            "event_idx {event_idx} exceeds packed event-seq limit {}",
93            MAX_EVENTS_PER_TX - 1
94        )));
95    }
96    if tx_seq > MAX_TX_SEQ {
97        return Err(StorageError::custom(format!(
98            "tx_seq {tx_seq} exceeds packed event-seq limit {MAX_TX_SEQ}"
99        )));
100    }
101    Ok((tx_seq << EVENT_BITS) | (event_idx as u64))
102}
103
104/// Lowest packed event_seq for a given `tx_seq` (idx 0), as an `Option` so
105/// the compaction filter (and other untrusted-input callers) get a clean
106/// `None` instead of an overflowing shift when `tx_seq > MAX_TX_SEQ`.
107fn checked_event_seq_lo(tx_seq: u64) -> Option<u64> {
108    if tx_seq <= MAX_TX_SEQ {
109        Some(tx_seq << EVENT_BITS)
110    } else {
111        None
112    }
113}
114
115fn bulk_ingestion_write_options() -> WriteOptions {
116    let mut opts = WriteOptions::default();
117    opts.disable_wal(true);
118    opts
119}
120
121/// Get available memory, respecting cgroup limits in containerized environments
122fn get_available_memory() -> u64 {
123    // RefreshKind::nothing().with_memory() avoids collecting other, slower stats
124    let mut sys = System::new_with_specifics(
125        RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
126    );
127    sys.refresh_memory();
128
129    // Check if we have cgroup limits
130    if let Some(cgroup_limits) = sys.cgroup_limits() {
131        let memory_limit = cgroup_limits.total_memory;
132        // cgroup_limits.total_memory is 0 when there's no limit
133        if memory_limit > 0 {
134            debug!("Using cgroup memory limit: {} bytes", memory_limit);
135            return memory_limit;
136        }
137    }
138
139    // Fall back to system memory if no cgroup limits found
140    // sysinfo 0.35 already reports bytes (not KiB like older versions)
141    let total_memory_bytes = sys.total_memory();
142    debug!("Using system memory: {} bytes", total_memory_bytes);
143    total_memory_bytes
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147struct MetadataInfo {
148    /// Version of the Database
149    version: u64,
150}
151
152/// Per-DB rpc-index settings, persisted in their own column family so the
153/// schema `version` stays a clean monotonic number rather than carrying packed
154/// feature bits.
155///
156/// Stored as JSON inside an opaque `Vec<u8>` value (see [`encode_settings`])
157/// rather than as a bcs-encoded struct, so the settings can evolve without a
158/// schema break: every field carries `#[serde(default)]`, so a newer binary
159/// reading an older DB fills missing fields with defaults, and an older binary
160/// reading a newer DB ignores fields it doesn't know — safe in both upgrade
161/// directions. Add new flags here with `#[serde(default)]` and that property
162/// holds.
163#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
164struct IndexSettings {
165    /// Whether this DB was built with ledger-history indexing enabled.
166    #[serde(default)]
167    ledger_history_indexing: bool,
168}
169
170/// Serialize settings to the JSON bytes stored in the `settings` CF. JSON of a
171/// plain flag struct cannot fail, so this does not return a `Result`.
172fn encode_settings(settings: &IndexSettings) -> Vec<u8> {
173    serde_json::to_vec(settings).expect("IndexSettings is always JSON-serializable")
174}
175
176/// Decode `settings`-CF JSON bytes, defaulting on absent/corrupt data. With
177/// `#[serde(default)]` on every field, schema evolution never reaches the
178/// default-on-error path; only genuine corruption does.
179fn decode_settings(bytes: &[u8]) -> IndexSettings {
180    serde_json::from_slice(bytes).unwrap_or_default()
181}
182
183/// Checkpoint watermark type
184#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
185pub enum Watermark {
186    Indexed,
187    Pruned,
188}
189
190#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
191pub struct OwnerIndexKey {
192    pub owner: SuiAddress,
193
194    pub object_type: StructTag,
195
196    // If this object is coin-like (eg 0x2::coin::Coin) then this will be the balance of the coin
197    // inverted `!coin.balance` in order to force sorting of coins to be from greatest to least
198    pub inverted_balance: Option<u64>,
199
200    pub object_id: ObjectID,
201}
202
203impl OwnerIndexKey {
204    // Creates a key from the provided object.
205    // Panics if the provided object is not an Address owned object
206    fn from_object(object: &Object) -> Self {
207        let owner = match object.owner() {
208            Owner::AddressOwner(owner) => owner,
209            Owner::ConsensusAddressOwner { owner, .. } => owner,
210            _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
211        };
212        let object_type = object.struct_tag().expect("packages cannot be owned");
213
214        let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
215
216        Self {
217            owner: *owner,
218            object_type,
219            inverted_balance,
220            object_id: object.id(),
221        }
222    }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226pub struct OwnerIndexInfo {
227    // object_id and type of this object are a part of the key
228    pub version: SequenceNumber,
229}
230
231impl OwnerIndexInfo {
232    pub fn new(object: &Object) -> Self {
233        Self {
234            version: object.version(),
235        }
236    }
237}
238
239#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
240pub struct CoinIndexKey {
241    coin_type: StructTag,
242}
243
244#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
245pub struct BalanceKey {
246    pub owner: SuiAddress,
247    pub coin_type: StructTag,
248}
249
250#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
251pub struct CoinIndexInfo {
252    pub coin_metadata_object_id: Option<ObjectID>,
253    pub treasury_object_id: Option<ObjectID>,
254    pub regulated_coin_metadata_object_id: Option<ObjectID>,
255}
256
257#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
258pub struct BalanceIndexInfo {
259    pub coin_balance_delta: i128,
260    pub address_balance_delta: i128,
261}
262
263impl BalanceIndexInfo {
264    fn merge_delta(&mut self, other: &Self) {
265        self.coin_balance_delta = self
266            .coin_balance_delta
267            .saturating_add(other.coin_balance_delta);
268        self.address_balance_delta = self
269            .address_balance_delta
270            .saturating_add(other.address_balance_delta);
271    }
272}
273
274impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
275    fn from(index_info: BalanceIndexInfo) -> Self {
276        // Note: We represent balance deltas as i128 to simplify merging positive and negative updates.
277        // Be aware: Move doesn’t enforce a one-time-witness (OTW) pattern when creating a Supply<T>.
278        // Anyone can call `sui::balance::create_supply` and mint unbounded supply, potentially pushing
279        // total balances over u64::MAX. To avoid crashing the indexer, we clamp the merged value instead
280        // of panicking on overflow. This has the unfortunate consequence of making bugs in the index
281        // harder to detect, but is a necessary trade-off to avoid creating a DOS attack vector.
282        let coin_balance = index_info.coin_balance_delta.clamp(0, u64::MAX as i128) as u64;
283        let address_balance = index_info.address_balance_delta.clamp(0, u64::MAX as i128) as u64;
284        sui_types::storage::BalanceInfo {
285            coin_balance,
286            address_balance,
287        }
288    }
289}
290
291#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
292pub struct PackageVersionKey {
293    pub original_package_id: ObjectID,
294    pub version: u64,
295}
296
297#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
298pub struct PackageVersionInfo {
299    pub storage_id: ObjectID,
300}
301
302/// Row of the `tx_seq_digest` table — direct mapping from `tx_sequence_number`
303/// to `(digest, event_count, tx_offset, checkpoint_number)`. `event_count` lets
304/// event listings enumerate a transaction's event_seqs without rereading the tx
305/// row. `tx_offset` is the transaction's zero-based position within its
306/// checkpoint, letting readers report a `(checkpoint, tx_offset)` coordinate
307/// without recomputing it from the checkpoint's base sequence number.
308#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
309pub struct TxSeqDigestInfo {
310    pub digest: TransactionDigest,
311    pub event_count: u32,
312    pub tx_offset: u32,
313    pub checkpoint_number: CheckpointSequenceNumber,
314}
315
316/// Row key for both bitmap CFs.
317///
318/// `dimension_key` is `[tag_byte][value_bytes]` per `sui-inverted-index`.
319/// `bucket_id` is the integer division `seq / BUCKET_SIZE` for whichever
320/// sequence space the CF is keyed by (tx_seq for `transaction_bitmap`, packed
321/// event_seq for `event_bitmap`).
322///
323/// typed-store encodes keys with bincode `with_big_endian().with_fixint_encoding()`,
324/// so the on-disk layout is:
325///   [8 B BE length(dimension_key)] [dimension_key bytes] [8 B BE bucket_id]
326/// Within a fixed `dimension_key`, range scans over `bucket_id` are
327/// numerically ordered. The compaction filter recovers `bucket_id` from the
328/// trailing 8 bytes.
329#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
330pub struct BitmapIndexKey {
331    pub dimension_key: Vec<u8>,
332    pub bucket_id: u64,
333}
334
335/// Value stored in the bitmap CFs: the raw bytes of `RoaringBitmap::serialize_into`.
336///
337/// typed-store BCS-wraps this on disk (ULEB128 length prefix + raw bitmap
338/// bytes). The merge operator decodes operands, ORs them, and re-encodes —
339/// see `bitmap_union_merge_operator`.
340#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
341pub struct BitmapBlob(pub Vec<u8>);
342
343impl From<RoaringBitmap> for BitmapBlob {
344    fn from(bm: RoaringBitmap) -> Self {
345        let mut buf = Vec::with_capacity(bm.serialized_size());
346        bm.serialize_into(&mut buf)
347            .expect("RoaringBitmap::serialize_into on Vec cannot fail");
348        Self(buf)
349    }
350}
351
352fn ledger_tx_seq_digest(tx_sequence_number: u64, info: TxSeqDigestInfo) -> LedgerTxSeqDigest {
353    LedgerTxSeqDigest {
354        tx_sequence_number,
355        digest: info.digest,
356        event_count: info.event_count,
357        tx_offset: info.tx_offset,
358        checkpoint_number: info.checkpoint_number,
359    }
360}
361
362fn decode_ledger_bitmap_bucket(
363    key: BitmapIndexKey,
364    blob: BitmapBlob,
365) -> Result<LedgerBitmapBucket, TypedStoreError> {
366    let bitmap = RoaringBitmap::deserialize_from(&blob.0[..]).map_err(|e| {
367        TypedStoreError::SerializationError(format!("decode ledger bitmap bucket: {e}"))
368    })?;
369    Ok(LedgerBitmapBucket {
370        bucket_id: key.bucket_id,
371        bitmap,
372    })
373}
374
375/// Which sequence space a bitmap CF is keyed by. Owns the whole-bucket
376/// removability math the `BitmapCompactionFilter` applies.
377#[derive(Clone, Copy, Debug, PartialEq, Eq)]
378pub enum BitmapKind {
379    Transaction,
380    Event,
381}
382
383impl BitmapKind {
384    /// Returns true when every seq in `bucket_id` is strictly below
385    /// `pruned_tx_seq_exclusive` — i.e. the whole bucket row is safe to drop.
386    ///
387    /// Both kinds bucket by integer division, but in different sequence
388    /// spaces, while the prune watermark is always in tx-seq space:
389    /// `Transaction` buckets are tx-seq ranges, so the watermark compares
390    /// directly; `Event` buckets are packed-event-seq ranges, so the watermark
391    /// is first converted to its lowest event-seq (`checked_event_seq_lo`).
392    ///
393    /// All arithmetic is `checked_*`: `bucket_id` is untrusted input decoded
394    /// from a rocksdb key, and an overflow must not panic the compaction
395    /// thread — it conservatively returns `false` (keep) instead.
396    fn bucket_fully_pruned(self, bucket_id: u64, pruned_tx_seq_exclusive: u64) -> bool {
397        match self {
398            BitmapKind::Transaction => bucket_id
399                .checked_add(1)
400                .and_then(|b| b.checked_mul(TX_BUCKET_SIZE))
401                .map(|hi| hi <= pruned_tx_seq_exclusive)
402                .unwrap_or(false),
403            BitmapKind::Event => {
404                let bucket_hi = bucket_id
405                    .checked_add(1)
406                    .and_then(|b| b.checked_mul(EVENT_BUCKET_SIZE));
407                let threshold = checked_event_seq_lo(pruned_tx_seq_exclusive);
408                match (bucket_hi, threshold) {
409                    (Some(hi), Some(th)) => hi <= th,
410                    _ => false,
411                }
412            }
413        }
414    }
415}
416
417#[derive(Default, Clone)]
418pub struct IndexStoreOptions {
419    /// Shared exclusive tx-seq prune floor for compaction filters.
420    /// A zero floor keeps every bucket.
421    pub pruning_tx_seq_exclusive: Arc<AtomicU64>,
422}
423
424fn default_table_options() -> typed_store::rocks::DBOptions {
425    typed_store::rocks::default_db_options().disable_write_throttling()
426}
427
428/// Like `default_table_options`, but honors range-delete tombstones immediately
429/// instead of ignoring them until compaction (the rocksdb default). `tx_seq_digest`
430/// is pruned with `schedule_delete_range` and `first_tx_seq_digest_key` reads the
431/// resulting pruning floor straight back, so tombstones must be visible to reads at
432/// once or the floor would not advance until a compaction happened to run.
433fn tx_seq_digest_table_options() -> typed_store::rocks::DBOptions {
434    let mut options = default_table_options();
435    options.rw_options = options.rw_options.clone().set_ignore_range_deletions(false);
436    options
437}
438
439fn balance_delta_merge_operator(
440    _key: &[u8],
441    existing_val: Option<&[u8]>,
442    operands: &MergeOperands,
443) -> Option<Vec<u8>> {
444    let mut result = if let Some(existing_val) = existing_val {
445        bcs::from_bytes::<BalanceIndexInfo>(existing_val)
446            .inspect_err(|e| {
447                tracing::error!(
448                    "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
449                )
450            })
451            .ok()?
452    } else {
453        BalanceIndexInfo::default()
454    };
455
456    for operand in operands.iter() {
457        let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
458            .inspect_err(|e| {
459                tracing::error!(
460                    "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
461                )
462            })
463            .ok()?;
464        result.merge_delta(&delta);
465    }
466
467    Some(
468        bcs::to_bytes(&result)
469            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
470    )
471}
472
473fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
474    let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
475        Ok(info) => info,
476        Err(_) => return Decision::Keep,
477    };
478
479    if balance_info.coin_balance_delta == 0 && balance_info.address_balance_delta == 0 {
480        Decision::Remove
481    } else {
482        Decision::Keep
483    }
484}
485
486fn balance_table_options() -> typed_store::rocks::DBOptions {
487    default_table_options()
488        .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
489        .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
490}
491
492// Bitmap inverted index: merge operator, compaction filter, options.
493
494fn decode_bitmap_blob(bcs_bytes: &[u8]) -> Result<RoaringBitmap, anyhow::Error> {
495    let blob: BitmapBlob = bcs::from_bytes(bcs_bytes)?;
496    Ok(RoaringBitmap::deserialize_from(&blob.0[..])?)
497}
498
499fn encode_bitmap_blob(bm: &RoaringBitmap) -> Vec<u8> {
500    let mut buf = Vec::with_capacity(bm.serialized_size());
501    bm.serialize_into(&mut buf)
502        .expect("RoaringBitmap::serialize_into on Vec cannot fail");
503    bcs::to_bytes(&BitmapBlob(buf)).expect("BCS encode of BitmapBlob cannot fail")
504}
505
506/// RocksDB merge operator for both bitmap CFs. ORs all operands (and any
507/// existing on-disk value) into a single bitmap.
508fn bitmap_union_merge_operator(
509    _key: &[u8],
510    existing_val: Option<&[u8]>,
511    operands: &MergeOperands,
512) -> Option<Vec<u8>> {
513    let mut acc = match existing_val {
514        Some(v) => match decode_bitmap_blob(v) {
515            Ok(bm) => bm,
516            Err(e) => {
517                tracing::error!(
518                    "Failed to deserialize existing BitmapBlob during merge - data corruption: {e}"
519                );
520                return None;
521            }
522        },
523        None => RoaringBitmap::new(),
524    };
525
526    for operand in operands.iter() {
527        match decode_bitmap_blob(operand) {
528            Ok(bm) => acc |= bm,
529            Err(e) => {
530                tracing::error!(
531                    "Failed to deserialize BitmapBlob operand during merge - data corruption: {e}"
532                );
533                return None;
534            }
535        }
536    }
537
538    // Convert dense containers to run containers before serializing the
539    // accumulated bitmap. This is the on-disk representation (the merge
540    // operator's output is what RocksDB stores), and a bucket that matches
541    // many consecutive tx_seqs compresses substantially as runs. Mirrors the
542    // BigTable bitmap committer, which optimizes each row before writing.
543    // Operands are not optimized — they carry a bit or a handful, so there is
544    // nothing for run-encoding to collapse.
545    acc.optimize();
546    Some(encode_bitmap_blob(&acc))
547}
548
549/// Whole-bucket compaction filter for bitmap CFs. Reads the trailing 8 bytes
550/// of a typed-store key as `bucket_id` (bincode big-endian fixed-int), then
551/// removes the row iff the bucket is entirely below the current
552/// `tx_seq_pruning_watermark` exclusive value.
553///
554/// Never `Remove` on a parse failure: silent data loss is worse than a stuck
555/// row. The bucket math is `checked_*` because `bucket_id` is untrusted input
556/// from rocksdb — a corrupted key shouldn't be able to panic the compaction
557/// thread.
558#[derive(Clone)]
559pub struct BitmapCompactionFilter {
560    pruning_tx_seq_exclusive: Arc<AtomicU64>,
561    kind: BitmapKind,
562}
563
564impl BitmapCompactionFilter {
565    pub fn new(pruning_tx_seq_exclusive: Arc<AtomicU64>, kind: BitmapKind) -> Self {
566        Self {
567            pruning_tx_seq_exclusive,
568            kind,
569        }
570    }
571
572    pub fn filter(&self, key: &[u8], _value: &[u8]) -> Decision {
573        if key.len() < 8 {
574            warn!(
575                kind = ?self.kind,
576                "bitmap compaction filter saw key shorter than 8 bytes ({}); keeping",
577                key.len(),
578            );
579            return Decision::Keep;
580        }
581        let bucket_id =
582            u64::from_be_bytes(key[key.len() - 8..].try_into().expect("len checked above"));
583        let pruned_exclusive = self.pruning_tx_seq_exclusive.load(Ordering::Relaxed);
584
585        if self.kind.bucket_fully_pruned(bucket_id, pruned_exclusive) {
586            Decision::Remove
587        } else {
588            Decision::Keep
589        }
590    }
591}
592
593/// Default bitmap CF options. The merge operator must be present on every open;
594/// `bitmap_cf_options` adds the runtime compaction filter.
595fn bitmap_cf_default_options() -> typed_store::rocks::DBOptions {
596    default_table_options()
597        .set_merge_operator_associative("bitmap_union_merge", bitmap_union_merge_operator)
598}
599
600/// Bitmap CF options with the per-CF compaction filter attached.
601fn bitmap_cf_options(
602    filter_name: &str,
603    filter: BitmapCompactionFilter,
604) -> typed_store::rocks::DBOptions {
605    let mut options = bitmap_cf_default_options();
606    options
607        .options
608        .set_compaction_filter(filter_name, move |_level, key, value| {
609            filter.filter(key, value)
610        });
611    options
612}
613
614impl CoinIndexInfo {
615    fn merge(&mut self, other: Self) {
616        self.coin_metadata_object_id = self
617            .coin_metadata_object_id
618            .or(other.coin_metadata_object_id);
619        self.regulated_coin_metadata_object_id = self
620            .regulated_coin_metadata_object_id
621            .or(other.regulated_coin_metadata_object_id);
622        self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
623    }
624}
625
626/// RocksDB tables for the RpcIndexStore
627///
628/// Anytime a new table is added, or and existing one has it's schema changed, make sure to also
629/// update the value of `CURRENT_DB_VERSION`.
630///
631/// NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
632/// - bounded in size by the live object set
633/// - are prune-able and have corresponding logic in the `prune` function
634#[derive(DBMapUtils)]
635struct IndexStoreTables {
636    /// A singleton that store metadata information on the DB.
637    ///
638    /// A few uses for this singleton:
639    /// - determining if the DB has been initialized (as some tables will still be empty post
640    ///   initialization)
641    /// - version of the DB. Everytime a new table or schema is changed the version number needs to
642    ///   be incremented.
643    meta: DBMap<(), MetadataInfo>,
644
645    /// A singleton recording which optional features this DB was built with
646    /// (currently just ledger-history indexing). Kept separate from `meta` so
647    /// the schema `version` need not encode feature flags. The value is
648    /// `IndexSettings` as JSON bytes (see `IndexSettings`); auto-created on older
649    /// DBs, and a missing entry reads as `IndexSettings::default()` (all features
650    /// off).
651    settings: DBMap<(), Vec<u8>>,
652
653    /// Table used to track watermark for the highest indexed checkpoint
654    ///
655    /// This is useful to help know the highest checkpoint that was indexed in the event that the
656    /// node was running with indexes enabled, then run for a period of time with indexes disabled,
657    /// and then run with them enabled again so that the tables can be reinitialized.
658    #[default_options_override_fn = "default_table_options"]
659    watermark: DBMap<Watermark, CheckpointSequenceNumber>,
660
661    /// An index of extra metadata for Epochs.
662    ///
663    /// Only contains entries for transactions which have yet to be pruned from the main database.
664    #[default_options_override_fn = "default_table_options"]
665    epochs: DBMap<EpochId, EpochInfo>,
666
667    /// An index of extra metadata for Transactions.
668    ///
669    /// Only contains entries for transactions which have yet to be pruned from the main database.
670    #[default_options_override_fn = "default_table_options"]
671    #[allow(unused)]
672    #[deprecated]
673    transactions: DBMap<TransactionDigest, TransactionInfo>,
674
675    /// An index of object ownership.
676    ///
677    /// Allows an efficient iterator to list all objects currently owned by a specific user
678    /// account.
679    #[default_options_override_fn = "default_table_options"]
680    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
681
682    /// An index of dynamic fields (children objects).
683    ///
684    /// Allows an efficient iterator to list all of the dynamic fields owned by a particular
685    /// ObjectID.
686    #[default_options_override_fn = "default_table_options"]
687    dynamic_field: DBMap<DynamicFieldKey, ()>,
688
689    /// An index of Coin Types
690    ///
691    /// Allows looking up information related to published Coins, like the ObjectID of its
692    /// coorisponding CoinMetadata.
693    #[default_options_override_fn = "default_table_options"]
694    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
695
696    /// An index of Balances.
697    ///
698    /// Allows looking up balances by owner address and coin type.
699    #[default_options_override_fn = "balance_table_options"]
700    balance: DBMap<BalanceKey, BalanceIndexInfo>,
701
702    /// An index of Package versions.
703    ///
704    /// Maps original package ID and version to the storage ID of that version.
705    /// Allows efficient listing of all versions of a package.
706    #[default_options_override_fn = "default_table_options"]
707    package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
708
709    /// `tx_sequence_number` → (digest, event_count, checkpoint_number).
710    #[default_options_override_fn = "tx_seq_digest_table_options"]
711    tx_seq_digest: DBMap<u64, TxSeqDigestInfo>,
712
713    /// Transaction bitmap index keyed by `(dimension_key, tx_seq bucket)`.
714    #[default_options_override_fn = "bitmap_cf_default_options"]
715    transaction_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
716
717    /// Event bitmap index keyed by `(dimension_key, packed event_seq bucket)`.
718    #[default_options_override_fn = "bitmap_cf_default_options"]
719    event_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
720    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
721    // - bounded in size by the live object set
722    // - are prune-able and have corresponding logic in the `prune` function
723}
724
725/// Completely empty a column family and physically reclaim its disk. A single
726/// range tombstone over `[first, last)` (end-exclusive, so paired with a point
727/// delete of `last`) drops every row in O(1). The range is then compacted so
728/// the data is physically removed: this reclaims the space immediately rather
729/// than waiting for a background compaction, and makes the rows invisible to
730/// reads regardless of the CF's `ignore_range_deletions` setting.
731fn clear_table<K, V>(map: &DBMap<K, V>) -> Result<(), TypedStoreError>
732where
733    K: Serialize + DeserializeOwned,
734    V: Serialize + DeserializeOwned,
735{
736    let first = map.safe_iter().next().transpose()?.map(|(k, _)| k);
737    let last = map
738        .reversed_safe_iter_with_bounds(None, None)?
739        .next()
740        .transpose()?
741        .map(|(k, _)| k);
742    let (Some(first), Some(last)) = (first, last) else {
743        return Ok(());
744    };
745    let mut batch = map.batch();
746    batch.schedule_delete_range(map, &first, &last)?;
747    batch.delete_batch(map, std::iter::once(&last))?;
748    batch.write()?;
749    map.compact_range(&first, &last)
750}
751
752impl IndexStoreTables {
753    fn extract_version_if_package(
754        object: &Object,
755    ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
756        if let Data::Package(package) = &object.data {
757            let original_id = package.original_package_id();
758            let version = package.version().value();
759            let storage_id = object.id();
760
761            let key = PackageVersionKey {
762                original_package_id: original_id,
763                version,
764            };
765            let info = PackageVersionInfo { storage_id };
766            return Some((key, info));
767        }
768        None
769    }
770
771    fn open_with_index_options<P: Into<PathBuf>>(
772        path: P,
773        index_options: IndexStoreOptions,
774    ) -> Self {
775        // The typed-store derive macro only honors the per-field
776        // `default_options_override_fn` when `tables_db_options_override` is
777        // `None`. As soon as we pass `Some(map)`, any CF missing from the map
778        // silently falls back to bare `default_db_options()` — losing the
779        // `disable_write_throttling` applied by `default_table_options`. To
780        // avoid that, populate the map with `default_table_options()` for every
781        // table before overriding the few that need bespoke configuration.
782        let mut table_options = std::collections::BTreeMap::new();
783        for (table_name, _) in IndexStoreTables::describe_tables() {
784            table_options.insert(table_name, default_table_options());
785        }
786        table_options.insert("balance".to_string(), balance_table_options());
787        // Range-delete pruning needs tombstones honored by reads immediately.
788        table_options.insert("tx_seq_digest".to_string(), tx_seq_digest_table_options());
789
790        let bitmap_filter_tx = BitmapCompactionFilter::new(
791            index_options.pruning_tx_seq_exclusive.clone(),
792            BitmapKind::Transaction,
793        );
794        let bitmap_filter_event = BitmapCompactionFilter::new(
795            index_options.pruning_tx_seq_exclusive.clone(),
796            BitmapKind::Event,
797        );
798        table_options.insert(
799            "transaction_bitmap".to_string(),
800            bitmap_cf_options("transaction_bitmap_filter", bitmap_filter_tx),
801        );
802        table_options.insert(
803            "event_bitmap".to_string(),
804            bitmap_cf_options("event_bitmap_filter", bitmap_filter_event),
805        );
806
807        IndexStoreTables::open_tables_read_write_with_deprecation_option(
808            path.into(),
809            MetricConf::new("rpc-index"),
810            None,
811            Some(DBMapTableConfigMap::new(table_options)),
812            true, // remove deprecated tables
813        )
814    }
815
816    fn open_with_options<P: Into<PathBuf>>(
817        path: P,
818        options: typed_store::rocksdb::Options,
819        table_options: Option<DBMapTableConfigMap>,
820    ) -> Self {
821        IndexStoreTables::open_tables_read_write_with_deprecation_option(
822            path.into(),
823            MetricConf::new("rpc-index"),
824            Some(options),
825            table_options,
826            true, // remove deprecated tables
827        )
828    }
829
830    /// Whether the ledger-history feature was enabled when this DB was built,
831    /// as persisted in the `settings` CF. A missing entry (fresh or pre-feature
832    /// DB) reads as `false`.
833    fn persisted_ledger_history_indexing(&self) -> bool {
834        self.read_settings().ledger_history_indexing
835    }
836
837    /// Read the persisted settings, defaulting when the row is absent or
838    /// unreadable.
839    fn read_settings(&self) -> IndexSettings {
840        self.settings
841            .get(&())
842            .ok()
843            .flatten()
844            .map(|bytes| decode_settings(&bytes))
845            .unwrap_or_default()
846    }
847
848    fn needs_to_do_initialization(
849        &self,
850        checkpoint_store: &CheckpointStore,
851        ledger_history_indexing: bool,
852    ) -> bool {
853        let schema_stale = match self.meta.get(&()) {
854            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
855            Ok(None) | Err(_) => true,
856        };
857        // *Enabling* ledger history requires a full rebuild to backfill the
858        // historical rows. *Disabling* it does not — the base indexes stay
859        // valid, so the now-unused history CFs are dropped in place by
860        // `disable_ledger_history_indexing` instead.
861        let enabling = ledger_history_indexing && !self.persisted_ledger_history_indexing();
862        schema_stale || enabling || self.is_indexed_watermark_out_of_date(checkpoint_store)
863    }
864
865    /// Drop the ledger-history column families and clear the persisted feature
866    /// flag without rebuilding the rest of the index. Used when a node that had
867    /// ledger history enabled restarts with it disabled: the base indexes are
868    /// untouched, so only the now-unused history rows need to go. Idempotent —
869    /// it runs at open before the store goes live, so a crash mid-way just
870    /// re-runs it on the next start.
871    fn disable_ledger_history_indexing(&self) -> Result<(), TypedStoreError> {
872        clear_table(&self.tx_seq_digest)?;
873        clear_table(&self.transaction_bitmap)?;
874        clear_table(&self.event_bitmap)?;
875        self.settings.insert(
876            &(),
877            &encode_settings(&IndexSettings {
878                ledger_history_indexing: false,
879            }),
880        )?;
881        Ok(())
882    }
883
884    // Check if the index watermark is behind the highets_executed watermark.
885    fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
886        let highest_executed_checkpint = checkpoint_store
887            .get_highest_executed_checkpoint_seq_number()
888            .ok()
889            .flatten();
890        let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
891        watermark < highest_executed_checkpint
892    }
893
894    #[tracing::instrument(skip_all)]
895    fn init(
896        &mut self,
897        authority_store: &AuthorityStore,
898        checkpoint_store: &CheckpointStore,
899        _epoch_store: &AuthorityPerEpochStore,
900        _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
901        batch_size_limit: usize,
902        rpc_config: &sui_config::RpcConfig,
903    ) -> Result<(), StorageError> {
904        info!("Initializing RPC indexes");
905
906        // The restore target: the highest checkpoint whose transaction outputs
907        // (objects, effects) are durably committed to the perpetual store. The
908        // live-object indexing below reads the live object set directly, and the
909        // historical backfill covers `[lowest, restore_target]`, so this is the
910        // checkpoint the rebuilt index reflects. We stamp `Watermark::Indexed`
911        // here, and `commit_update_for_checkpoint` then only applies the next
912        // contiguous checkpoint (`watermark + 1`), dropping forward updates at
913        // or below the watermark. That is what stops the checkpoint executor --
914        // which resumes from the possibly-lagging `highest_executed` -- from
915        // re-applying checkpoints the restore already captured.
916        //
917        // We use the perpetual store's `highest_committed` watermark (written
918        // atomically with the objects) rather than the checkpoint store's
919        // `highest_executed` (bumped in a separate write afterward). An unclean
920        // stop can leave the object writes durable while `highest_executed`
921        // still lags, so the live object set can reflect a checkpoint beyond
922        // `highest_executed`; using `highest_committed` keeps the restore target
923        // (and therefore the drop floor) consistent with the set we actually
924        // read, so the executor's re-application of `(highest_executed,
925        // highest_committed]` is dropped rather than double-counted.
926        //
927        // Fall back to `highest_executed` for a database written before the
928        // atomic `highest_committed` watermark existed: it has no stamp yet, so
929        // this preserves the prior restore target until the next committed
930        // checkpoint stamps the consistent one. In normal operation
931        // `highest_committed` is written before `highest_executed` is bumped, so
932        // it is never absent while the executed watermark is present.
933        let restore_target = authority_store
934            .perpetual_tables
935            .get_highest_committed_checkpoint()?
936            .or(checkpoint_store.get_highest_executed_checkpoint_seq_number()?);
937        let lowest_available_checkpoint = checkpoint_store
938            .get_highest_pruned_checkpoint_seq_number()?
939            .map(|c| c.saturating_add(1))
940            .unwrap_or(0);
941        let lowest_available_checkpoint_objects = authority_store
942            .perpetual_tables
943            .get_highest_pruned_checkpoint()?
944            .map(|c| c.saturating_add(1))
945            .unwrap_or(0);
946        // Doing backfill requires processing objects so we have to restrict our backfill range
947        // to the range of checkpoints that we have objects for.
948        let lowest_available_checkpoint =
949            lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
950
951        let checkpoint_range =
952            restore_target.map(|restore_target| lowest_available_checkpoint..=restore_target);
953
954        if let Some(checkpoint_range) = checkpoint_range.clone() {
955            self.index_existing_checkpoints(authority_store, checkpoint_store, checkpoint_range)?;
956        }
957
958        if rpc_config.ledger_history_indexing()
959            && let Some(checkpoint_range) = checkpoint_range
960        {
961            self.backfill_ledger_history_indexes(
962                authority_store,
963                checkpoint_store,
964                checkpoint_range,
965            )?;
966        }
967
968        self.initialize_current_epoch(authority_store, checkpoint_store)?;
969
970        // Only index live objects if genesis checkpoint has been executed.
971        // If genesis hasn't been executed yet, the objects will be properly indexed
972        // as checkpoints are processed through the normal checkpoint execution path.
973        if restore_target.is_some() {
974            let coin_index = Mutex::new(HashMap::new());
975
976            let make_live_object_indexer = RpcParLiveObjectSetIndexer {
977                tables: self,
978                coin_index: &coin_index,
979                batch_size_limit,
980            };
981
982            crate::par_index_live_object_set::par_index_live_object_set(
983                authority_store,
984                &make_live_object_indexer,
985            )?;
986
987            self.coin.multi_insert(coin_index.into_inner().unwrap())?;
988        }
989
990        // Stamp the watermark at the restore target so forward indexing resumes
991        // (and the drop floor in `commit_update_for_checkpoint` sits) exactly at
992        // the checkpoint the rebuilt index reflects. When `restore_target` is
993        // `None` -- a fresh node with nothing committed yet -- leave the
994        // watermark absent rather than stamping 0: nothing has been indexed, so
995        // genesis must still be applied by forward indexing (`watermark + 1`
996        // starts at 0), not dropped as already-covered.
997        if let Some(restore_target) = restore_target {
998            self.watermark
999                .insert(&Watermark::Indexed, &restore_target)?;
1000        }
1001
1002        // Write the schema version and the feature settings in one batch so the
1003        // two can never be persisted independently.
1004        let mut batch = self.meta.batch();
1005        batch.insert_batch(
1006            &self.meta,
1007            [(
1008                (),
1009                MetadataInfo {
1010                    version: CURRENT_DB_VERSION,
1011                },
1012            )],
1013        )?;
1014        batch.insert_batch(
1015            &self.settings,
1016            [(
1017                (),
1018                encode_settings(&IndexSettings {
1019                    ledger_history_indexing: rpc_config.ledger_history_indexing(),
1020                }),
1021            )],
1022        )?;
1023        batch.write()?;
1024
1025        info!("Finished initializing RPC indexes");
1026
1027        Ok(())
1028    }
1029
1030    #[tracing::instrument(skip(self, authority_store, checkpoint_store))]
1031    fn index_existing_checkpoints(
1032        &mut self,
1033        authority_store: &AuthorityStore,
1034        checkpoint_store: &CheckpointStore,
1035        checkpoint_range: std::ops::RangeInclusive<u64>,
1036    ) -> Result<(), StorageError> {
1037        info!(
1038            "Indexing {} checkpoints in range {checkpoint_range:?}",
1039            checkpoint_range.size_hint().0
1040        );
1041        let start_time = Instant::now();
1042
1043        checkpoint_range.into_par_iter().try_for_each(|seq| {
1044            let Some(checkpoint) =
1045                sparse_checkpoint_for_epoch_backfill(authority_store, checkpoint_store, seq)?
1046            else {
1047                return Ok(());
1048            };
1049
1050            let mut batch = self.epochs.batch();
1051
1052            self.index_epoch(&checkpoint, &mut batch)?;
1053
1054            batch
1055                .write_opt(bulk_ingestion_write_options())
1056                .map_err(StorageError::from)
1057        })?;
1058
1059        info!(
1060            "Indexing checkpoints took {} seconds",
1061            start_time.elapsed().as_secs()
1062        );
1063        Ok(())
1064    }
1065
1066    /// Backfill ledger history rows over a freshly recreated rpc-index DB.
1067    ///
1068    /// Bulk writes disable WAL, so this flushes before `init()` writes the
1069    /// `meta.version` / `settings` markers; otherwise a crash could persist
1070    /// those markers without the rows they claim to cover.
1071    fn backfill_ledger_history_indexes(
1072        &self,
1073        authority_store: &AuthorityStore,
1074        checkpoint_store: &CheckpointStore,
1075        checkpoint_range: std::ops::RangeInclusive<u64>,
1076    ) -> Result<(), StorageError> {
1077        info!("ledger history backfill: cps {checkpoint_range:?}");
1078        let start_time = Instant::now();
1079
1080        checkpoint_range.clone().into_par_iter().try_for_each(
1081            |seq| -> Result<(), StorageError> {
1082                let checkpoint =
1083                    full_checkpoint_for_backfill(authority_store, checkpoint_store, seq)?
1084                        .ok_or_else(|| {
1085                            // Missing retained data would leave a permanent hole.
1086                            StorageError::missing(format!(
1087                                "ledger history backfill: checkpoint {seq} is missing from local \
1088                                 storage but falls inside the retained backfill range \
1089                                 {checkpoint_range:?}"
1090                            ))
1091                        })?;
1092                let mut batch = self.meta.batch();
1093                self.write_ledger_history_rows_for_checkpoint(&checkpoint, &mut batch)?;
1094                batch
1095                    .write_opt(bulk_ingestion_write_options())
1096                    .map_err(StorageError::from)
1097            },
1098        )?;
1099
1100        // Flushing one CF flushes the whole shared RocksDB instance.
1101        self.tx_seq_digest.flush().map_err(|e| {
1102            StorageError::custom(format!("flush after ledger history backfill: {e}"))
1103        })?;
1104
1105        info!(
1106            "ledger history backfill took {} seconds",
1107            start_time.elapsed().as_secs()
1108        );
1109        Ok(())
1110    }
1111
1112    /// The lowest live key of `tx_seq_digest`: the ledger history pruning
1113    /// floor in tx-seq space. `prune()` maintains the invariant that this
1114    /// equals the highest fully-pruned tx_seq (exclusive): pruning
1115    /// point-deletes `tx_seq_digest` rows below the floor, and forward
1116    /// indexing only adds rows above it. Returns `None` when the CF is empty
1117    /// (nothing indexed yet), which callers treat as floor 0.
1118    fn first_tx_seq_digest_key(&self) -> Result<Option<u64>, TypedStoreError> {
1119        match self.tx_seq_digest.safe_iter().next() {
1120            Some(Ok((k, _))) => Ok(Some(k)),
1121            Some(Err(e)) => Err(e),
1122            None => Ok(None),
1123        }
1124    }
1125
1126    /// Prune data from this Index. `pruned_tx_seq_exclusive` is the absolute
1127    /// tx-seq floor after this prune — the caller derives it from the
1128    /// last-pruned checkpoint's `network_total_transactions`.
1129    ///
1130    /// When ledger history is enabled, the compaction-filter atomic is moved to
1131    /// the new floor *after* the batch commits, so the atomic never leads disk.
1132    fn prune(
1133        &self,
1134        pruned_checkpoint_watermark: u64,
1135        pruned_tx_seq_exclusive: u64,
1136        ledger_history_enabled: bool,
1137        pruning_atomic: &AtomicU64,
1138    ) -> Result<(), TypedStoreError> {
1139        let mut batch = self.watermark.batch();
1140
1141        batch.insert_batch(
1142            &self.watermark,
1143            [(Watermark::Pruned, pruned_checkpoint_watermark)],
1144        )?;
1145
1146        if ledger_history_enabled {
1147            // First live `tx_seq_digest` key = current floor, read from disk so a
1148            // prune skipped by a crash self-heals on the next call. The range
1149            // delete shares the batch with the `Watermark::Pruned` insert above.
1150            let prev_exclusive = self.first_tx_seq_digest_key()?.unwrap_or(0);
1151            batch.schedule_delete_range(
1152                &self.tx_seq_digest,
1153                &prev_exclusive,
1154                &pruned_tx_seq_exclusive,
1155            )?;
1156        }
1157
1158        batch.write()?;
1159
1160        if ledger_history_enabled {
1161            // After the commit so the atomic never leads disk.
1162            pruning_atomic.store(pruned_tx_seq_exclusive, Ordering::Relaxed);
1163        }
1164        Ok(())
1165    }
1166
1167    /// Index a Checkpoint
1168    fn index_checkpoint(
1169        &self,
1170        checkpoint: &Checkpoint,
1171        ledger_history_enabled: bool,
1172    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
1173        debug!(
1174            checkpoint = checkpoint.summary.sequence_number,
1175            "indexing checkpoint"
1176        );
1177
1178        let mut batch = self.owner.batch();
1179
1180        self.index_epoch(checkpoint, &mut batch)?;
1181        self.index_transactions(checkpoint, &mut batch)?;
1182        self.index_objects(checkpoint, &mut batch)?;
1183
1184        // Ledger history rows ride the same batch as `Watermark::Indexed`.
1185        if ledger_history_enabled {
1186            self.write_ledger_history_rows_for_checkpoint(checkpoint, &mut batch)?;
1187        }
1188
1189        batch.insert_batch(
1190            &self.watermark,
1191            [(Watermark::Indexed, checkpoint.summary.sequence_number)],
1192        )?;
1193
1194        debug!(
1195            checkpoint = checkpoint.summary.sequence_number,
1196            "finished indexing checkpoint"
1197        );
1198
1199        Ok(batch)
1200    }
1201
1202    fn index_epoch(
1203        &self,
1204        checkpoint: &Checkpoint,
1205        batch: &mut typed_store::rocks::DBBatch,
1206    ) -> Result<(), StorageError> {
1207        let Some(epoch_info) = checkpoint.epoch_info()? else {
1208            return Ok(());
1209        };
1210        if epoch_info.epoch > 0 {
1211            let prev_epoch = epoch_info.epoch - 1;
1212            let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
1213            current_epoch.epoch = prev_epoch; // set this incase there wasn't an entry
1214            current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
1215            current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
1216            batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
1217        }
1218        batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
1219        Ok(())
1220    }
1221
1222    // After attempting to reindex past epochs, ensure that the current epoch is at least partially
1223    // initalized
1224    fn initialize_current_epoch(
1225        &mut self,
1226        authority_store: &AuthorityStore,
1227        checkpoint_store: &CheckpointStore,
1228    ) -> Result<(), StorageError> {
1229        let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
1230            return Ok(());
1231        };
1232
1233        let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
1234            .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
1235
1236        let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
1237        epoch.epoch = checkpoint.epoch;
1238
1239        if epoch.protocol_version.is_none() {
1240            epoch.protocol_version = Some(system_state.protocol_version());
1241        }
1242
1243        if epoch.start_timestamp_ms.is_none() {
1244            epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
1245        }
1246
1247        if epoch.reference_gas_price.is_none() {
1248            epoch.reference_gas_price = Some(system_state.reference_gas_price());
1249        }
1250
1251        if epoch.system_state.is_none() {
1252            epoch.system_state = Some(system_state);
1253        }
1254
1255        self.epochs.insert(&epoch.epoch, &epoch)?;
1256
1257        Ok(())
1258    }
1259
1260    fn index_transactions(
1261        &self,
1262        checkpoint: &Checkpoint,
1263        batch: &mut typed_store::rocks::DBBatch,
1264    ) -> Result<(), StorageError> {
1265        for tx in &checkpoint.transactions {
1266            let balance_changes = sui_types::balance_change::derive_detailed_balance_changes_2(
1267                &tx.effects,
1268                &checkpoint.object_set,
1269            )
1270            .into_iter()
1271            .filter_map(|change| {
1272                if let TypeTag::Struct(coin_type) = change.coin_type {
1273                    Some((
1274                        BalanceKey {
1275                            owner: change.address,
1276                            coin_type: *coin_type,
1277                        },
1278                        BalanceIndexInfo {
1279                            coin_balance_delta: change.coin_amount,
1280                            address_balance_delta: change.address_amount,
1281                        },
1282                    ))
1283                } else {
1284                    None
1285                }
1286            });
1287            batch.partial_merge_batch(&self.balance, balance_changes)?;
1288        }
1289
1290        Ok(())
1291    }
1292
1293    /// Emit `tx_seq_digest` rows and bitmap merge operands for every tx in
1294    /// `checkpoint`. Shared by forward indexing (`index_checkpoint`) and the
1295    /// rebuild-time ledger history backfill. There is no separate watermark:
1296    /// `Watermark::Indexed` is the source of truth for coverage.
1297    fn write_ledger_history_rows_for_checkpoint(
1298        &self,
1299        checkpoint: &Checkpoint,
1300        batch: &mut typed_store::rocks::DBBatch,
1301    ) -> Result<(), StorageError> {
1302        let cp_seq = checkpoint.summary.sequence_number;
1303        let net_total = checkpoint.summary.data().network_total_transactions;
1304        let tx_count = checkpoint.transactions.len() as u64;
1305        // `network_total_transactions` is cumulative *including* this cp.
1306        // checked_sub: if the cp's network_total_transactions is somehow
1307        // less than its own tx count, surface an error rather than wrap.
1308        let tx_lo = net_total.checked_sub(tx_count).ok_or_else(|| {
1309            StorageError::custom(format!(
1310                "checkpoint {cp_seq}: network_total_transactions ({net_total}) \
1311                 < tx_count ({tx_count})"
1312            ))
1313        })?;
1314
1315        let object_set = &checkpoint.object_set;
1316
1317        // Group tx-space bitmap bits across the whole checkpoint so repeated
1318        // dimensions in the same tx bucket produce one Rocks merge operand.
1319        let mut tx_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1320
1321        for (i, tx) in checkpoint.transactions.iter().enumerate() {
1322            let tx_seq = tx_lo + i as u64;
1323
1324            let tx_data = &tx.transaction;
1325            let digest = *tx.effects.transaction_digest();
1326            let event_count = tx.events.as_ref().map(|e| e.data.len() as u32).unwrap_or(0);
1327
1328            // tx_seq_digest: one direct row per tx, no merge needed.
1329            batch.insert_batch(
1330                &self.tx_seq_digest,
1331                [(
1332                    tx_seq,
1333                    TxSeqDigestInfo {
1334                        digest,
1335                        event_count,
1336                        // `i` is the transaction's zero-based position within this checkpoint.
1337                        tx_offset: i as u32,
1338                        checkpoint_number: cp_seq,
1339                    },
1340                )],
1341            )?;
1342
1343            // Tx-space bitmap: dedup dimension_keys within this tx, then add
1344            // this tx's bit to the checkpoint-scoped bitmap group.
1345            let tx_bucket = tx_seq / TX_BUCKET_SIZE;
1346            let tx_bit = (tx_seq % TX_BUCKET_SIZE) as u32;
1347            let mut tx_dim_keys: FxHashSet<Vec<u8>> = FxHashSet::default();
1348            for_each_transaction_dimension(
1349                tx_data,
1350                &tx.effects,
1351                tx.events.as_ref(),
1352                object_set,
1353                |dim, value| {
1354                    tx_dim_keys.insert(encode_dimension_key(dim, value));
1355                },
1356            );
1357            for dim_key in tx_dim_keys {
1358                tx_groups
1359                    .entry((dim_key, tx_bucket))
1360                    .or_default()
1361                    .insert(tx_bit);
1362            }
1363
1364            // Event-space bitmap: bits from multiple events of the same tx
1365            // can share a (dim_key, bucket); group into a RoaringBitmap so
1366            // we emit at most one operand per group.
1367            let mut event_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1368            let mut event_seq_error = None;
1369            for_each_event_dimension(
1370                tx_data.sender(),
1371                &tx.effects,
1372                tx.events.as_ref(),
1373                |event_idx, dim, value| {
1374                    let event_seq = match checked_encode_event_seq(tx_seq, event_idx) {
1375                        Ok(event_seq) => event_seq,
1376                        Err(e) => {
1377                            event_seq_error.get_or_insert(e);
1378                            return;
1379                        }
1380                    };
1381                    let bucket = event_seq / EVENT_BUCKET_SIZE;
1382                    let bit = (event_seq % EVENT_BUCKET_SIZE) as u32;
1383                    event_groups
1384                        .entry((encode_dimension_key(dim, value), bucket))
1385                        .or_default()
1386                        .insert(bit);
1387                },
1388            );
1389            if let Some(e) = event_seq_error {
1390                return Err(e);
1391            }
1392            let event_ops = event_groups.into_iter().map(|((dim_key, bucket), bm)| {
1393                (
1394                    BitmapIndexKey {
1395                        dimension_key: dim_key,
1396                        bucket_id: bucket,
1397                    },
1398                    BitmapBlob::from(bm),
1399                )
1400            });
1401            batch.partial_merge_batch(&self.event_bitmap, event_ops)?;
1402        }
1403
1404        let tx_ops = tx_groups.into_iter().map(|((dim_key, bucket), bm)| {
1405            (
1406                BitmapIndexKey {
1407                    dimension_key: dim_key,
1408                    bucket_id: bucket,
1409                },
1410                BitmapBlob::from(bm),
1411            )
1412        });
1413        batch.partial_merge_batch(&self.transaction_bitmap, tx_ops)?;
1414
1415        Ok(())
1416    }
1417
1418    fn index_objects(
1419        &self,
1420        checkpoint: &Checkpoint,
1421        batch: &mut typed_store::rocks::DBBatch,
1422    ) -> Result<(), StorageError> {
1423        let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
1424        let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
1425        let object_set = &checkpoint.object_set;
1426
1427        for tx in &checkpoint.transactions {
1428            // determine changes from removed objects
1429            for removed_object in tx_removed_objects_pre_version(tx, object_set) {
1430                match removed_object.owner() {
1431                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1432                        let owner_key = OwnerIndexKey::from_object(removed_object);
1433                        batch.delete_batch(&self.owner, [owner_key])?;
1434                    }
1435                    Owner::ObjectOwner(object_id) => {
1436                        batch.delete_batch(
1437                            &self.dynamic_field,
1438                            [DynamicFieldKey::new(*object_id, removed_object.id())],
1439                        )?;
1440                    }
1441                    Owner::Shared { .. } | Owner::Immutable => {}
1442                    Owner::Party { .. } => {
1443                        // TODO(Party WIP)
1444                        todo!("Party WIP");
1445                        // We could maybe look at non-default permissions for the owner. But
1446                        // I'm not sure what this is really used for
1447                    }
1448                }
1449            }
1450
1451            // determine changes from changed objects
1452            for (object, old_object) in tx_changed_objects(tx, object_set) {
1453                if let Some(old_object) = old_object {
1454                    match old_object.owner() {
1455                        Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1456                            let owner_key = OwnerIndexKey::from_object(old_object);
1457                            batch.delete_batch(&self.owner, [owner_key])?;
1458                        }
1459
1460                        Owner::ObjectOwner(object_id) => {
1461                            if old_object.owner() != object.owner() {
1462                                batch.delete_batch(
1463                                    &self.dynamic_field,
1464                                    [DynamicFieldKey::new(*object_id, old_object.id())],
1465                                )?;
1466                            }
1467                        }
1468
1469                        Owner::Shared { .. } | Owner::Immutable => {}
1470
1471                        Owner::Party { .. } => {
1472                            // TODO(Party WIP)
1473                            todo!("Party WIP");
1474                            // We could maybe look at non-default permissions for the owner. But
1475                            // I'm not sure what this is really used for
1476                        }
1477                    }
1478                }
1479
1480                match object.owner() {
1481                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1482                        let owner_key = OwnerIndexKey::from_object(object);
1483                        let owner_info = OwnerIndexInfo::new(object);
1484                        batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
1485                    }
1486                    Owner::ObjectOwner(parent) => {
1487                        if should_index_dynamic_field(object) {
1488                            let field_key = DynamicFieldKey::new(*parent, object.id());
1489                            batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
1490                        }
1491                    }
1492                    Owner::Shared { .. } | Owner::Immutable => {}
1493                    // TODO(Party WIP)
1494                    Owner::Party { .. } => todo!("Party WIP"),
1495                }
1496                if let Some((key, info)) = Self::extract_version_if_package(object) {
1497                    package_version_index.push((key, info));
1498                }
1499            }
1500
1501            // coin indexing
1502            //
1503            // coin indexing relies on the fact that CoinMetadata and TreasuryCap are created in
1504            // the same transaction so we don't need to worry about overriding any older value
1505            // that may exist in the database (because there necessarily cannot be).
1506            for (key, value) in tx
1507                .created_objects(object_set)
1508                .flat_map(try_create_coin_index_info)
1509            {
1510                use std::collections::hash_map::Entry;
1511
1512                match coin_index.entry(key) {
1513                    Entry::Occupied(mut o) => {
1514                        o.get_mut().merge(value);
1515                    }
1516                    Entry::Vacant(v) => {
1517                        v.insert(value);
1518                    }
1519                }
1520            }
1521        }
1522
1523        batch.insert_batch(&self.coin, coin_index)?;
1524        batch.insert_batch(&self.package_version, package_version_index)?;
1525
1526        Ok(())
1527    }
1528
1529    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1530        self.epochs.get(&epoch)
1531    }
1532
1533    fn owner_iter(
1534        &self,
1535        owner: SuiAddress,
1536        object_type: Option<StructTag>,
1537        cursor: Option<OwnerIndexKey>,
1538    ) -> Result<
1539        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1540        TypedStoreError,
1541    > {
1542        // TODO can we figure out how to pass a raw byte array as a cursor?
1543        let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1544            owner,
1545            object_type: object_type
1546                .clone()
1547                .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1548            inverted_balance: None,
1549            object_id: ObjectID::ZERO,
1550        });
1551
1552        Ok(self
1553            .owner
1554            .safe_iter_with_bounds(Some(lower_bound), None)
1555            .take_while(move |item| {
1556                // If there's an error let if flow through
1557                let Ok((key, _)) = item else {
1558                    return true;
1559                };
1560
1561                // Only take if owner matches
1562                key.owner == owner
1563                    // and if an object type was supplied that the type matches
1564                    && object_type
1565                        .as_ref()
1566                        .map(|ty| {
1567                            ty.address == key.object_type.address
1568                                && ty.module == key.object_type.module
1569                                && ty.name == key.object_type.name
1570                                // If type_params are not provided then we match all params
1571                                && (ty.type_params.is_empty() ||
1572                                    // If they are provided the type params must match
1573                                    ty.type_params == key.object_type.type_params)
1574                        }).unwrap_or(true)
1575            }))
1576    }
1577
1578    fn dynamic_field_iter(
1579        &self,
1580        parent: ObjectID,
1581        cursor: Option<ObjectID>,
1582    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1583    {
1584        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1585        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1586        let iter = self
1587            .dynamic_field
1588            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1589            .map_ok(|(key, ())| key);
1590        Ok(iter)
1591    }
1592
1593    fn get_coin_info(
1594        &self,
1595        coin_type: &StructTag,
1596    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1597        let key = CoinIndexKey {
1598            coin_type: coin_type.to_owned(),
1599        };
1600        self.coin.get(&key)
1601    }
1602
1603    fn get_balance(
1604        &self,
1605        owner: &SuiAddress,
1606        coin_type: &StructTag,
1607    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1608        let key = BalanceKey {
1609            owner: owner.to_owned(),
1610            coin_type: coin_type.to_owned(),
1611        };
1612        self.balance.get(&key)
1613    }
1614
1615    fn balance_iter(
1616        &self,
1617        owner: SuiAddress,
1618        cursor: Option<BalanceKey>,
1619    ) -> Result<
1620        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1621        TypedStoreError,
1622    > {
1623        let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1624            owner,
1625            coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1626        });
1627
1628        Ok(self
1629            .balance
1630            .safe_iter_with_bounds(Some(lower_bound), None)
1631            .scan((), move |_, item| {
1632                match item {
1633                    Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1634                    Ok(_) => None,          // Different owner, stop iteration
1635                    Err(e) => Some(Err(e)), // Propagate error
1636                }
1637            }))
1638    }
1639
1640    fn package_versions_iter(
1641        &self,
1642        original_id: ObjectID,
1643        cursor: Option<u64>,
1644    ) -> Result<
1645        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1646        TypedStoreError,
1647    > {
1648        let lower_bound = PackageVersionKey {
1649            original_package_id: original_id,
1650            version: cursor.unwrap_or(0),
1651        };
1652        let upper_bound = PackageVersionKey {
1653            original_package_id: original_id,
1654            version: u64::MAX,
1655        };
1656
1657        Ok(self
1658            .package_version
1659            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1660    }
1661}
1662
1663pub struct RpcIndexStore {
1664    tables: IndexStoreTables,
1665    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1666    /// Shared with the bitmap compaction filters. Advanced by `prune()` after
1667    /// the corresponding watermark batch commits, so compactions never see a
1668    /// value that hasn't been persisted.
1669    ledger_history_pruning_watermark: Arc<AtomicU64>,
1670    /// True iff this rpc-index DB was built with ledger history indexing
1671    /// enabled. Derived once at open from the persisted `settings` CF and used
1672    /// as the gate for forward indexing and pruning.
1673    ledger_history_enabled: bool,
1674}
1675
1676impl RpcIndexStore {
1677    /// Given the provided directory, construct the path to the db
1678    fn db_path(dir: &Path) -> PathBuf {
1679        dir.join("rpc-index")
1680    }
1681
1682    pub async fn new(
1683        dir: &Path,
1684        authority_store: &AuthorityStore,
1685        checkpoint_store: &CheckpointStore,
1686        epoch_store: &AuthorityPerEpochStore,
1687        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1688        rpc_config: sui_config::RpcConfig,
1689    ) -> Self {
1690        // Internal-only tx-seq floor, hydrated from disk on open.
1691        let ledger_history_pruning_watermark = Arc::new(AtomicU64::new(0));
1692        let index_options = IndexStoreOptions {
1693            pruning_tx_seq_exclusive: ledger_history_pruning_watermark,
1694        };
1695
1696        Self::new_with_options(
1697            dir,
1698            authority_store,
1699            checkpoint_store,
1700            epoch_store,
1701            package_store,
1702            index_options,
1703            rpc_config,
1704        )
1705        .await
1706    }
1707
1708    pub async fn new_with_options(
1709        dir: &Path,
1710        authority_store: &AuthorityStore,
1711        checkpoint_store: &CheckpointStore,
1712        epoch_store: &AuthorityPerEpochStore,
1713        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1714        index_options: IndexStoreOptions,
1715        rpc_config: sui_config::RpcConfig,
1716    ) -> Self {
1717        let path = Self::db_path(dir);
1718        let index_config = rpc_config.index_initialization_config();
1719
1720        let ledger_history_atomic = index_options.pruning_tx_seq_exclusive.clone();
1721
1722        let tables = {
1723            let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1724
1725            // Rebuild if the schema or watermarks are stale, or ledger history
1726            // is being enabled (which needs a backfill).
1727            if tables
1728                .needs_to_do_initialization(checkpoint_store, rpc_config.ledger_history_indexing())
1729            {
1730                let batch_size_limit;
1731
1732                let mut tables = {
1733                    drop(tables);
1734                    typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1735                        .await
1736                        .expect("unable to destroy old rpc-index db");
1737
1738                    // Open the empty DB with `unordered_write`s enabled in order to get a ~3x
1739                    // speedup when indexing
1740                    let mut options = typed_store::rocksdb::Options::default();
1741                    options.set_unordered_write(true);
1742
1743                    // Allow CPU-intensive flushing operations to use all CPUs.
1744                    let max_background_jobs = if let Some(jobs) =
1745                        index_config.as_ref().and_then(|c| c.max_background_jobs)
1746                    {
1747                        debug!("Using config override for max_background_jobs: {}", jobs);
1748                        jobs
1749                    } else {
1750                        let jobs = num_cpus::get() as i32;
1751                        debug!(
1752                            "Calculated max_background_jobs: {} (based on CPU count)",
1753                            jobs
1754                        );
1755                        jobs
1756                    };
1757                    options.set_max_background_jobs(max_background_jobs);
1758
1759                    // We are disabling compaction for all column families below. This means we can
1760                    // also disable the backpressure that slows down writes when the number of L0
1761                    // files builds up since we will never compact them anyway.
1762                    options.set_level_zero_file_num_compaction_trigger(0);
1763                    options.set_level_zero_slowdown_writes_trigger(-1);
1764                    options.set_level_zero_stop_writes_trigger(i32::MAX);
1765
1766                    let total_memory_bytes = get_available_memory();
1767                    // This is an upper bound on the amount to of ram the memtables can use across
1768                    // all column families.
1769                    let db_buffer_size = if let Some(size) =
1770                        index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1771                    {
1772                        debug!(
1773                            "Using config override for db_write_buffer_size: {} bytes",
1774                            size
1775                        );
1776                        size
1777                    } else {
1778                        // Default to 80% of system RAM
1779                        let size = (total_memory_bytes as f64 * 0.8) as usize;
1780                        debug!(
1781                            "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1782                            size, total_memory_bytes
1783                        );
1784                        size
1785                    };
1786                    options.set_db_write_buffer_size(db_buffer_size);
1787
1788                    // Create column family specific options.
1789                    let mut table_config_map = BTreeMap::new();
1790
1791                    // Create options with compactions disabled and large write buffers.
1792                    // Each CF can use up to 25% of system RAM, but total is still limited by
1793                    // set_db_write_buffer_size configured above.
1794                    let mut cf_options = typed_store::rocks::default_db_options();
1795                    cf_options.options.set_disable_auto_compactions(true);
1796
1797                    let (buffer_size, buffer_count) = match (
1798                        index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1799                        index_config
1800                            .as_ref()
1801                            .and_then(|c| c.cf_max_write_buffer_number),
1802                    ) {
1803                        (Some(size), Some(count)) => {
1804                            debug!(
1805                                "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1806                                size, count
1807                            );
1808                            (size, count)
1809                        }
1810                        (None, None) => {
1811                            // Calculate buffer configuration: 25% of RAM split across buffers
1812                            let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1813                            debug!(
1814                                "Column family memory budget: {} bytes (25% of {} total bytes)",
1815                                cf_memory_budget, total_memory_bytes
1816                            );
1817                            const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB minimum
1818
1819                            // Target number of buffers based on CPU count
1820                            // More CPUs = more parallel flushing capability
1821                            let target_buffer_count = num_cpus::get().max(2);
1822
1823                            // Aim for CPU-based buffer count, but reduce if it would make buffers too small
1824                            //   For example:
1825                            // - 128GB RAM, 32 CPUs: 32GB per CF / 32 buffers = 1GB each
1826                            // - 16GB RAM, 8 CPUs: 4GB per CF / 8 buffers = 512MB each
1827                            // - 4GB RAM, 8 CPUs: 1GB per CF / 64MB min = ~16 buffers of 64MB each
1828                            let buffer_size =
1829                                (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1830                            let buffer_count = (cf_memory_budget / buffer_size)
1831                                .clamp(2, target_buffer_count)
1832                                as i32;
1833                            debug!(
1834                                "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1835                                buffer_size, buffer_count, target_buffer_count
1836                            );
1837                            (buffer_size, buffer_count)
1838                        }
1839                        _ => {
1840                            panic!(
1841                                "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1842                            );
1843                        }
1844                    };
1845
1846                    cf_options.options.set_write_buffer_size(buffer_size);
1847                    cf_options.options.set_max_write_buffer_number(buffer_count);
1848
1849                    // Calculate batch size limit: default to half the buffer size or 128MB, whichever is smaller
1850                    batch_size_limit = if let Some(limit) =
1851                        index_config.as_ref().and_then(|c| c.batch_size_limit)
1852                    {
1853                        debug!(
1854                            "Using config override for batch_size_limit: {} bytes",
1855                            limit
1856                        );
1857                        limit
1858                    } else {
1859                        let half_buffer = buffer_size / 2;
1860                        let default_limit = 1 << 27; // 128MB
1861                        let limit = half_buffer.min(default_limit);
1862                        debug!(
1863                            "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1864                            limit, half_buffer, default_limit
1865                        );
1866                        limit
1867                    };
1868
1869                    // Apply cf_options to all tables
1870                    for (table_name, _) in IndexStoreTables::describe_tables() {
1871                        table_config_map.insert(table_name, cf_options.clone());
1872                    }
1873
1874                    // Override Balance options with the merge operator
1875                    let mut balance_options = cf_options.clone();
1876                    balance_options = balance_options.set_merge_operator_associative(
1877                        "balance_merge",
1878                        balance_delta_merge_operator,
1879                    );
1880                    table_config_map.insert("balance".to_string(), balance_options);
1881
1882                    let bitmap_filter_tx = BitmapCompactionFilter::new(
1883                        index_options.pruning_tx_seq_exclusive.clone(),
1884                        BitmapKind::Transaction,
1885                    );
1886                    let bitmap_filter_event = BitmapCompactionFilter::new(
1887                        index_options.pruning_tx_seq_exclusive.clone(),
1888                        BitmapKind::Event,
1889                    );
1890                    let mut transaction_bitmap_opts = cf_options.clone();
1891                    transaction_bitmap_opts = transaction_bitmap_opts
1892                        .set_merge_operator_associative(
1893                            "bitmap_union_merge",
1894                            bitmap_union_merge_operator,
1895                        );
1896                    transaction_bitmap_opts.options.set_compaction_filter(
1897                        "transaction_bitmap_filter",
1898                        move |_level, key, value| bitmap_filter_tx.filter(key, value),
1899                    );
1900                    table_config_map
1901                        .insert("transaction_bitmap".to_string(), transaction_bitmap_opts);
1902
1903                    let mut event_bitmap_opts = cf_options.clone();
1904                    event_bitmap_opts = event_bitmap_opts.set_merge_operator_associative(
1905                        "bitmap_union_merge",
1906                        bitmap_union_merge_operator,
1907                    );
1908                    event_bitmap_opts
1909                        .options
1910                        .set_compaction_filter("event_bitmap_filter", move |_level, key, value| {
1911                            bitmap_filter_event.filter(key, value)
1912                        });
1913                    table_config_map.insert("event_bitmap".to_string(), event_bitmap_opts);
1914
1915                    IndexStoreTables::open_with_options(
1916                        &path,
1917                        options,
1918                        Some(DBMapTableConfigMap::new(table_config_map)),
1919                    )
1920                };
1921
1922                tables
1923                    .init(
1924                        authority_store,
1925                        checkpoint_store,
1926                        epoch_store,
1927                        package_store,
1928                        batch_size_limit,
1929                        &rpc_config,
1930                    )
1931                    .expect("unable to initialize rpc index from live object set");
1932
1933                // Flush all data to disk before dropping tables.
1934                // This is critical because WAL is disabled during bulk indexing.
1935                // Note we only need to call flush on one table because all tables share the same
1936                // underlying database.
1937                tables
1938                    .meta
1939                    .flush()
1940                    .expect("Failed to flush RPC index tables to disk");
1941
1942                let weak_db = Arc::downgrade(&tables.meta.db);
1943                drop(tables);
1944
1945                let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1946                loop {
1947                    if weak_db.strong_count() == 0 {
1948                        break;
1949                    }
1950                    if std::time::Instant::now() > deadline {
1951                        panic!("unable to reopen DB after indexing");
1952                    }
1953                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1954                }
1955
1956                // Reopen the DB with default options (eg without `unordered_write`s enabled)
1957                let reopened_tables =
1958                    IndexStoreTables::open_with_index_options(&path, index_options);
1959
1960                // Sanity check: verify the schema version and feature settings
1961                // were persisted correctly.
1962                let stored_version = reopened_tables
1963                    .meta
1964                    .get(&())
1965                    .expect("Failed to read metadata from reopened database")
1966                    .expect("Metadata not found in reopened database");
1967                assert_eq!(
1968                    stored_version.version, CURRENT_DB_VERSION,
1969                    "Database version mismatch after flush and reopen: expected {:#x}, found {:#x}",
1970                    CURRENT_DB_VERSION, stored_version.version
1971                );
1972                assert_eq!(
1973                    reopened_tables.persisted_ledger_history_indexing(),
1974                    rpc_config.ledger_history_indexing(),
1975                    "ledger-history setting mismatch after flush and reopen"
1976                );
1977
1978                reopened_tables
1979            } else {
1980                // No rebuild needed. If ledger history was on and is now off,
1981                // drop just those CFs rather than reindexing everything.
1982                if tables.persisted_ledger_history_indexing()
1983                    && !rpc_config.ledger_history_indexing()
1984                {
1985                    tables
1986                        .disable_ledger_history_indexing()
1987                        .expect("unable to disable ledger history indexing");
1988                }
1989                tables
1990            }
1991        };
1992
1993        // Hydrate before compaction filters can observe the default 0 floor.
1994        Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
1995
1996        // `ledger_history_enabled` is derived from the persisted `settings` CF,
1997        // not directly from config.
1998        let ledger_history_enabled = tables.persisted_ledger_history_indexing();
1999        debug_assert_eq!(
2000            ledger_history_enabled,
2001            rpc_config.ledger_history_indexing(),
2002            "ledger_history_enabled (from settings CF) must match the configured ledger_history_indexing flag"
2003        );
2004
2005        Self {
2006            tables,
2007            pending_updates: Default::default(),
2008            ledger_history_pruning_watermark: ledger_history_atomic,
2009            ledger_history_enabled,
2010        }
2011    }
2012
2013    /// Hydrate the tx-seq pruning floor from the first live `tx_seq_digest` key.
2014    fn hydrate_ledger_history_pruning_atomic(tables: &IndexStoreTables, atomic: &Arc<AtomicU64>) {
2015        let persisted = tables.first_tx_seq_digest_key().ok().flatten().unwrap_or(0);
2016        atomic.store(persisted, Ordering::Relaxed);
2017    }
2018
2019    pub fn new_without_init(dir: &Path) -> Self {
2020        let path = Self::db_path(dir);
2021
2022        // Keep already-built ledger history indexes prunable in offline paths.
2023        let ledger_history_atomic = Arc::new(AtomicU64::new(0));
2024        let index_options = IndexStoreOptions {
2025            pruning_tx_seq_exclusive: ledger_history_atomic.clone(),
2026        };
2027        let tables = IndexStoreTables::open_with_index_options(path, index_options);
2028
2029        let ledger_history_enabled = tables.persisted_ledger_history_indexing();
2030        Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
2031
2032        Self {
2033            tables,
2034            pending_updates: Default::default(),
2035            ledger_history_pruning_watermark: ledger_history_atomic,
2036            ledger_history_enabled,
2037        }
2038    }
2039
2040    pub fn prune(
2041        &self,
2042        pruned_checkpoint_watermark: u64,
2043        pruned_tx_seq_exclusive: u64,
2044    ) -> Result<(), TypedStoreError> {
2045        self.tables.prune(
2046            pruned_checkpoint_watermark,
2047            pruned_tx_seq_exclusive,
2048            self.ledger_history_enabled,
2049            &self.ledger_history_pruning_watermark,
2050        )
2051    }
2052
2053    /// Index a checkpoint and stage the index updated in `pending_updates`.
2054    ///
2055    /// Updates will not be committed to the database until `commit_update_for_checkpoint` is
2056    /// called.
2057    #[tracing::instrument(
2058        skip_all,
2059        fields(checkpoint = checkpoint.summary.sequence_number)
2060    )]
2061    pub fn index_checkpoint(&self, checkpoint: &Checkpoint) {
2062        let sequence_number = checkpoint.summary.sequence_number;
2063        let batch = self
2064            .tables
2065            .index_checkpoint(checkpoint, self.ledger_history_enabled)
2066            .expect("db error");
2067
2068        self.pending_updates
2069            .lock()
2070            .unwrap()
2071            .insert(sequence_number, batch);
2072    }
2073
2074    /// Commits the pending updates for the provided checkpoint number.
2075    ///
2076    /// Invariants:
2077    /// - `index_checkpoint` must have been called for the provided checkpoint
2078    /// - Callers of this function must ensure that it is called for each checkpoint in sequential
2079    ///   order. This will panic if the provided checkpoint does not match the expected next
2080    ///   checkpoint to commit.
2081    ///
2082    /// Forward updates are gated on `Watermark::Indexed`, the source of truth for
2083    /// index coverage: only the next contiguous checkpoint (`watermark + 1`, or 0
2084    /// when nothing is indexed yet) is written, and its batch advances the
2085    /// watermark. A checkpoint at or below the watermark is already reflected in
2086    /// the index, so its batch is dropped rather than re-applied -- this is what
2087    /// prevents double-counting after a bulk restore. The restore stamps the
2088    /// watermark at the checkpoint the live object set reflects
2089    /// (`highest_committed`), but the checkpoint executor resumes from the
2090    /// separately-bumped `highest_executed`, which can lag; without this drop it
2091    /// would re-apply the `(highest_executed, highest_committed]` checkpoints the
2092    /// restore already captured, double-counting additive indexes like balances.
2093    /// A checkpoint above `watermark + 1` is a gap that would leave a hole in the
2094    /// index, so it panics.
2095    #[tracing::instrument(skip(self))]
2096    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
2097        let next_batch = self.pending_updates.lock().unwrap().pop_first();
2098
2099        // Its expected that the next batch exists
2100        let (next_sequence_number, batch) = next_batch.unwrap();
2101        assert_eq!(
2102            checkpoint, next_sequence_number,
2103            "commit_update_for_checkpoint must be called in order"
2104        );
2105
2106        let indexed = self.tables.watermark.get(&Watermark::Indexed)?;
2107        let expected_next = indexed.map_or(0, |w| w + 1);
2108        if checkpoint < expected_next {
2109            // Already covered (by the bulk restore or a prior commit). Dropping
2110            // the batch avoids re-applying its additive index updates.
2111            debug!(
2112                checkpoint,
2113                expected_next, "dropping already-indexed checkpoint update"
2114            );
2115            return Ok(());
2116        }
2117        assert_eq!(
2118            checkpoint, expected_next,
2119            "rpc-index forward update is not contiguous: expected checkpoint {expected_next}, \
2120             got {checkpoint}"
2121        );
2122
2123        Ok(batch.write()?)
2124    }
2125
2126    pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
2127        self.tables.get_epoch_info(epoch)
2128    }
2129
2130    pub fn owner_iter(
2131        &self,
2132        owner: SuiAddress,
2133        object_type: Option<StructTag>,
2134        cursor: Option<OwnerIndexKey>,
2135    ) -> Result<
2136        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
2137        TypedStoreError,
2138    > {
2139        self.tables.owner_iter(owner, object_type, cursor)
2140    }
2141
2142    pub fn dynamic_field_iter(
2143        &self,
2144        parent: ObjectID,
2145        cursor: Option<ObjectID>,
2146    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
2147    {
2148        self.tables.dynamic_field_iter(parent, cursor)
2149    }
2150
2151    pub fn get_coin_info(
2152        &self,
2153        coin_type: &StructTag,
2154    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
2155        self.tables.get_coin_info(coin_type)
2156    }
2157
2158    pub fn get_balance(
2159        &self,
2160        owner: &SuiAddress,
2161        coin_type: &StructTag,
2162    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
2163        self.tables.get_balance(owner, coin_type)
2164    }
2165
2166    pub fn balance_iter(
2167        &self,
2168        owner: SuiAddress,
2169        cursor: Option<BalanceKey>,
2170    ) -> Result<
2171        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
2172        TypedStoreError,
2173    > {
2174        self.tables.balance_iter(owner, cursor)
2175    }
2176
2177    pub fn package_versions_iter(
2178        &self,
2179        original_id: ObjectID,
2180        cursor: Option<u64>,
2181    ) -> Result<
2182        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
2183        TypedStoreError,
2184    > {
2185        self.tables.package_versions_iter(original_id, cursor)
2186    }
2187
2188    pub fn get_highest_indexed_checkpoint_seq_number(
2189        &self,
2190    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
2191        self.tables.watermark.get(&Watermark::Indexed)
2192    }
2193
2194    fn ensure_ledger_history_enabled(&self) -> Result<(), TypedStoreError> {
2195        if self.ledger_history_enabled {
2196            Ok(())
2197        } else {
2198            Err(TypedStoreError::SerializationError(
2199                "ledger history indexing is disabled".to_owned(),
2200            ))
2201        }
2202    }
2203
2204    pub fn ledger_tx_seq_digest(
2205        &self,
2206        tx_seq: u64,
2207    ) -> Result<Option<LedgerTxSeqDigest>, TypedStoreError> {
2208        self.ensure_ledger_history_enabled()?;
2209        Ok(self
2210            .tables
2211            .tx_seq_digest
2212            .get(&tx_seq)?
2213            .map(|info| ledger_tx_seq_digest(tx_seq, info)))
2214    }
2215
2216    pub fn ledger_tx_seq_digest_multi_get(
2217        &self,
2218        tx_seqs: &[u64],
2219    ) -> Result<Vec<Option<LedgerTxSeqDigest>>, TypedStoreError> {
2220        self.ensure_ledger_history_enabled()?;
2221        let rows = self
2222            .tables
2223            .tx_seq_digest
2224            .multi_get(tx_seqs)?
2225            .into_iter()
2226            .zip_debug_eq(tx_seqs.iter().copied())
2227            .map(|(info, tx_seq)| info.map(|info| ledger_tx_seq_digest(tx_seq, info)))
2228            .collect();
2229        Ok(rows)
2230    }
2231
2232    pub fn ledger_tx_seq_digest_iter(
2233        &self,
2234        start: u64,
2235        end_exclusive: u64,
2236        descending: bool,
2237    ) -> Result<LedgerTxSeqDigestIterator<'_>, TypedStoreError> {
2238        self.ensure_ledger_history_enabled()?;
2239        if start >= end_exclusive {
2240            return Ok(Box::new(std::iter::empty()));
2241        }
2242
2243        let iter = if descending {
2244            let upper = end_exclusive - 1;
2245            self.tables
2246                .tx_seq_digest
2247                .reversed_safe_iter_with_bounds(Some(start), Some(upper))?
2248        } else {
2249            self.tables
2250                .tx_seq_digest
2251                .safe_iter_with_bounds(Some(start), Some(end_exclusive))
2252        };
2253
2254        Ok(Box::new(iter.map(|result| {
2255            result.map(|(tx_seq, info)| ledger_tx_seq_digest(tx_seq, info))
2256        })))
2257    }
2258
2259    pub fn transaction_bitmap_bucket_iter(
2260        &self,
2261        dimension_key: Vec<u8>,
2262        start_bucket: u64,
2263        end_bucket_exclusive: u64,
2264        descending: bool,
2265    ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2266        self.ensure_ledger_history_enabled()?;
2267        Self::bitmap_bucket_iter(
2268            &self.tables.transaction_bitmap,
2269            dimension_key,
2270            start_bucket,
2271            end_bucket_exclusive,
2272            descending,
2273        )
2274    }
2275
2276    pub fn event_bitmap_bucket_iter(
2277        &self,
2278        dimension_key: Vec<u8>,
2279        start_bucket: u64,
2280        end_bucket_exclusive: u64,
2281        descending: bool,
2282    ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2283        self.ensure_ledger_history_enabled()?;
2284        Self::bitmap_bucket_iter(
2285            &self.tables.event_bitmap,
2286            dimension_key,
2287            start_bucket,
2288            end_bucket_exclusive,
2289            descending,
2290        )
2291    }
2292
2293    fn bitmap_bucket_iter(
2294        table: &DBMap<BitmapIndexKey, BitmapBlob>,
2295        dimension_key: Vec<u8>,
2296        start_bucket: u64,
2297        end_bucket_exclusive: u64,
2298        descending: bool,
2299    ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2300        if start_bucket >= end_bucket_exclusive {
2301            return Ok(Box::new(std::iter::empty()));
2302        }
2303
2304        let lower = BitmapIndexKey {
2305            dimension_key: dimension_key.clone(),
2306            bucket_id: start_bucket,
2307        };
2308        let upper_exclusive = BitmapIndexKey {
2309            dimension_key,
2310            bucket_id: end_bucket_exclusive,
2311        };
2312        let upper_inclusive = BitmapIndexKey {
2313            dimension_key: upper_exclusive.dimension_key.clone(),
2314            bucket_id: end_bucket_exclusive - 1,
2315        };
2316
2317        let iter: Box<
2318            dyn Iterator<Item = Result<(BitmapIndexKey, BitmapBlob), TypedStoreError>> + '_,
2319        > = if descending {
2320            table.reversed_safe_iter_with_bounds(Some(lower), Some(upper_inclusive))?
2321        } else {
2322            table.safe_iter_with_bounds(Some(lower), Some(upper_exclusive))
2323        };
2324
2325        Ok(Box::new(iter.map(|result| {
2326            result.and_then(|(key, blob)| decode_ledger_bitmap_bucket(key, blob))
2327        })))
2328    }
2329}
2330
2331/// Objects that existed before this transaction but no longer exist after
2332/// (deleted or wrapped). Mirrors `CheckpointTransaction::removed_objects_pre_version`
2333/// for `ExecutedTransaction`, which stores its objects in a shared `ObjectSet`
2334/// keyed by `(id, version)` rather than in dense per-tx vectors.
2335fn tx_removed_objects_pre_version<'a>(
2336    tx: &'a ExecutedTransaction,
2337    object_set: &'a ObjectSet,
2338) -> impl Iterator<Item = &'a Object> + 'a {
2339    tx.effects
2340        .object_changes()
2341        .into_iter()
2342        .filter_map(
2343            move |change| match (change.input_version, change.output_version) {
2344                (Some(input_version), None) => object_set.get(&ObjectKey(change.id, input_version)),
2345                _ => None,
2346            },
2347        )
2348}
2349
2350/// Pairs of `(output_object, optional_input_object)` for every changed object
2351/// (mutated, created, or unwrapped). Mirrors `CheckpointTransaction::changed_objects`
2352/// for `ExecutedTransaction`.
2353fn tx_changed_objects<'a>(
2354    tx: &'a ExecutedTransaction,
2355    object_set: &'a ObjectSet,
2356) -> impl Iterator<Item = (&'a Object, Option<&'a Object>)> + 'a {
2357    tx.effects
2358        .object_changes()
2359        .into_iter()
2360        .filter_map(move |change| {
2361            let output = change
2362                .output_version
2363                .and_then(|v| object_set.get(&ObjectKey(change.id, v)))?;
2364            let input = change
2365                .input_version
2366                .and_then(|v| object_set.get(&ObjectKey(change.id, v)));
2367            Some((output, input))
2368        })
2369}
2370
2371fn should_index_dynamic_field(object: &Object) -> bool {
2372    // Skip any objects that aren't of type `Field<Name, Value>`
2373    //
2374    // All dynamic fields are of type:
2375    //   - Field<Name, Value> for dynamic fields
2376    //   - Field<Wrapper<Name>, ID>> for dynamic field objects where the ID is the id of the pointed
2377    //   to object
2378    //
2379    object
2380        .data
2381        .try_as_move()
2382        .is_some_and(|move_object| move_object.type_().is_dynamic_field())
2383}
2384
2385fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
2386    use sui_types::coin::CoinMetadata;
2387    use sui_types::coin::RegulatedCoinMetadata;
2388    use sui_types::coin::TreasuryCap;
2389
2390    let object_type = object.type_().and_then(MoveObjectType::other)?;
2391
2392    if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
2393        return Some((
2394            CoinIndexKey { coin_type },
2395            CoinIndexInfo {
2396                coin_metadata_object_id: Some(object.id()),
2397                treasury_object_id: None,
2398                regulated_coin_metadata_object_id: None,
2399            },
2400        ));
2401    }
2402
2403    if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
2404        return Some((
2405            CoinIndexKey { coin_type },
2406            CoinIndexInfo {
2407                coin_metadata_object_id: None,
2408                treasury_object_id: Some(object.id()),
2409                regulated_coin_metadata_object_id: None,
2410            },
2411        ));
2412    }
2413
2414    if let Some(coin_type) =
2415        RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
2416    {
2417        return Some((
2418            CoinIndexKey { coin_type },
2419            CoinIndexInfo {
2420                coin_metadata_object_id: None,
2421                treasury_object_id: None,
2422                regulated_coin_metadata_object_id: Some(object.id()),
2423            },
2424        ));
2425    }
2426
2427    None
2428}
2429
2430struct RpcParLiveObjectSetIndexer<'a> {
2431    tables: &'a IndexStoreTables,
2432    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2433    batch_size_limit: usize,
2434}
2435
2436struct RpcLiveObjectIndexer<'a> {
2437    tables: &'a IndexStoreTables,
2438    batch: typed_store::rocks::DBBatch,
2439    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2440    balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
2441    batch_size_limit: usize,
2442}
2443
2444impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
2445    type ObjectIndexer = RpcLiveObjectIndexer<'a>;
2446
2447    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
2448        RpcLiveObjectIndexer {
2449            tables: self.tables,
2450            batch: self.tables.owner.batch(),
2451            coin_index: self.coin_index,
2452            balance_changes: HashMap::new(),
2453            batch_size_limit: self.batch_size_limit,
2454        }
2455    }
2456}
2457
2458impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
2459    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
2460        match object.owner {
2461            // Owner Index
2462            Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
2463                let owner_key = OwnerIndexKey::from_object(&object);
2464                let owner_info = OwnerIndexInfo::new(&object);
2465                self.batch
2466                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
2467
2468                if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
2469                    let balance_key = BalanceKey { owner, coin_type };
2470                    let balance_info = BalanceIndexInfo {
2471                        coin_balance_delta: value.into(),
2472                        address_balance_delta: 0,
2473                    };
2474                    self.balance_changes
2475                        .entry(balance_key)
2476                        .or_default()
2477                        .merge_delta(&balance_info);
2478
2479                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2480                        self.batch.partial_merge_batch(
2481                            &self.tables.balance,
2482                            std::mem::take(&mut self.balance_changes),
2483                        )?;
2484                    }
2485                }
2486            }
2487
2488            // Dynamic Field Index
2489            Owner::ObjectOwner(parent) => {
2490                if should_index_dynamic_field(&object) {
2491                    let field_key = DynamicFieldKey::new(parent, object.id());
2492                    self.batch
2493                        .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
2494                }
2495
2496                // Index address balances
2497                if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
2498                    && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
2499                {
2500                    let balance_key = BalanceKey { owner, coin_type };
2501                    let balance_info = BalanceIndexInfo {
2502                        coin_balance_delta: 0,
2503                        address_balance_delta: balance,
2504                    };
2505                    self.balance_changes
2506                        .entry(balance_key)
2507                        .or_default()
2508                        .merge_delta(&balance_info);
2509
2510                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2511                        self.batch.partial_merge_batch(
2512                            &self.tables.balance,
2513                            std::mem::take(&mut self.balance_changes),
2514                        )?;
2515                    }
2516                }
2517            }
2518
2519            Owner::Shared { .. } | Owner::Immutable => {}
2520
2521            Owner::Party { .. } => {
2522                // TODO(Party WIP)
2523                todo!("Party WIP");
2524                // We could maybe look at non-default permissions for "owners"?
2525            }
2526        }
2527
2528        // Look for CoinMetadata<T> and TreasuryCap<T> objects
2529        if let Some((key, value)) = try_create_coin_index_info(&object) {
2530            use std::collections::hash_map::Entry;
2531
2532            match self.coin_index.lock().unwrap().entry(key) {
2533                Entry::Occupied(mut o) => {
2534                    o.get_mut().merge(value);
2535                }
2536                Entry::Vacant(v) => {
2537                    v.insert(value);
2538                }
2539            }
2540        }
2541
2542        if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
2543            self.batch
2544                .insert_batch(&self.tables.package_version, [(key, info)])?;
2545        }
2546
2547        // If the batch size grows to greater than the limit then write out to the DB so that the
2548        // data we need to hold in memory doesn't grow unbounded.
2549        if self.batch.size_in_bytes() >= self.batch_size_limit {
2550            std::mem::replace(&mut self.batch, self.tables.owner.batch())
2551                .write_opt(bulk_ingestion_write_options())?;
2552        }
2553
2554        Ok(())
2555    }
2556
2557    fn finish(mut self) -> Result<(), StorageError> {
2558        self.batch.partial_merge_batch(
2559            &self.tables.balance,
2560            std::mem::take(&mut self.balance_changes),
2561        )?;
2562        self.batch.write_opt(bulk_ingestion_write_options())?;
2563        Ok(())
2564    }
2565}
2566
2567// TODO figure out a way to dedup this logic. Today we'd need to do quite a bit of refactoring to
2568// make it possible.
2569/// Load a full `Checkpoint` for `checkpoint` from local storage. Sibling
2570/// of [`sparse_checkpoint_for_epoch_backfill`] that returns data for
2571/// every cp (not just genesis / EoE) and always loads transaction events.
2572///
2573/// Returns `Ok(None)` if the cp's summary or contents are not present
2574/// locally (e.g. pruned out of the underlying store).
2575fn full_checkpoint_for_backfill(
2576    authority_store: &AuthorityStore,
2577    checkpoint_store: &CheckpointStore,
2578    checkpoint: u64,
2579) -> Result<Option<Checkpoint>, StorageError> {
2580    let Some(summary) = checkpoint_store.get_checkpoint_by_sequence_number(checkpoint)? else {
2581        return Ok(None);
2582    };
2583    let Some(contents) = checkpoint_store.get_checkpoint_contents(&summary.content_digest)? else {
2584        return Ok(None);
2585    };
2586
2587    // Always load events: event-space dimensions need them, and tx-space
2588    // dimensions include EmitModule / EventType / EventStreamHead which are
2589    // sourced from events too.
2590    let (transactions, object_set) = load_executed_transactions(authority_store, &contents, true)?;
2591
2592    Ok(Some(Checkpoint {
2593        summary: summary.into(),
2594        contents,
2595        transactions,
2596        object_set,
2597    }))
2598}
2599
2600fn sparse_checkpoint_for_epoch_backfill(
2601    authority_store: &AuthorityStore,
2602    checkpoint_store: &CheckpointStore,
2603    checkpoint: u64,
2604) -> Result<Option<Checkpoint>, StorageError> {
2605    let summary = checkpoint_store
2606        .get_checkpoint_by_sequence_number(checkpoint)?
2607        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2608
2609    // Only load genesis and end of epoch checkpoints
2610    if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
2611        return Ok(None);
2612    }
2613
2614    let contents = checkpoint_store
2615        .get_checkpoint_contents(&summary.content_digest)?
2616        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2617
2618    let (transactions, object_set) = load_executed_transactions(authority_store, &contents, false)?;
2619
2620    Ok(Some(Checkpoint {
2621        summary: summary.into(),
2622        contents,
2623        transactions,
2624        object_set,
2625    }))
2626}
2627
2628/// Load `ExecutedTransaction`s for every digest in `contents`, alongside an
2629/// `ObjectSet` populated with their input and output objects. `Checkpoint`
2630/// stores objects in a shared keyed set rather than the per-tx vectors used
2631/// by the old `CheckpointTransaction`, so this is the equivalent shape for
2632/// the backfill loaders to return.
2633fn load_executed_transactions(
2634    authority_store: &AuthorityStore,
2635    contents: &sui_types::messages_checkpoint::CheckpointContents,
2636    load_events: bool,
2637) -> Result<(Vec<ExecutedTransaction>, ObjectSet), StorageError> {
2638    let transaction_digests = contents
2639        .iter()
2640        .map(|execution_digests| execution_digests.transaction)
2641        .collect::<Vec<_>>();
2642    let transactions = authority_store
2643        .multi_get_transaction_blocks(&transaction_digests)?
2644        .into_iter()
2645        .map(|maybe_transaction| {
2646            maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
2647        })
2648        .collect::<Result<Vec<_>, _>>()?;
2649
2650    let effects = authority_store
2651        .multi_get_executed_effects(&transaction_digests)?
2652        .into_iter()
2653        .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
2654        .collect::<Result<Vec<_>, _>>()?;
2655
2656    let events = if load_events {
2657        authority_store
2658            .multi_get_events(&transaction_digests)
2659            .map_err(|e| StorageError::custom(e.to_string()))?
2660    } else {
2661        vec![None; transaction_digests.len()]
2662    };
2663
2664    let mut full_transactions = Vec::with_capacity(transactions.len());
2665    let mut object_set = ObjectSet::default();
2666    for ((tx, fx), ev) in transactions
2667        .into_iter()
2668        .zip_debug_eq(effects)
2669        .zip_debug_eq(events)
2670    {
2671        let input_objects =
2672            sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
2673        let output_objects =
2674            sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
2675
2676        for obj in input_objects.into_iter().chain(output_objects.into_iter()) {
2677            object_set.insert(obj);
2678        }
2679
2680        let sender_signed = sui_types::transaction::Transaction::from(tx)
2681            .into_data()
2682            .into_inner();
2683        full_transactions.push(ExecutedTransaction {
2684            transaction: sender_signed.intent_message.value,
2685            signatures: sender_signed.tx_signatures,
2686            effects: fx,
2687            events: ev,
2688            // The backfill index paths (`index_epoch` and
2689            // `write_ledger_history_rows_for_checkpoint`) only read objects
2690            // through `effects.object_changes()`, which never reaches
2691            // unchanged loaded runtime objects. Leaving this empty avoids a
2692            // pointless lookup per checkpoint.
2693            unchanged_loaded_runtime_objects: Vec::new(),
2694        });
2695    }
2696
2697    Ok((full_transactions, object_set))
2698}
2699
2700fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
2701    match Coin::extract_balance_if_coin(object) {
2702        Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
2703        Ok(Some(_)) => {
2704            debug!("Coin object {} has non-struct type tag", object.id());
2705            Ok(None)
2706        }
2707        Ok(None) => {
2708            // Not a coin
2709            Ok(None)
2710        }
2711        Err(e) => {
2712            // Corrupted coin data
2713            Err(StorageError::custom(format!(
2714                "Failed to deserialize coin object {}: {}",
2715                object.id(),
2716                e
2717            )))
2718        }
2719    }
2720}
2721
2722fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
2723    let move_object = object.data.try_as_move()?;
2724
2725    let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
2726    else {
2727        return None;
2728    };
2729
2730    let (key, value): (
2731        sui_types::accumulator_root::AccumulatorKey,
2732        sui_types::accumulator_root::AccumulatorValue,
2733    ) = move_object.try_into().ok()?;
2734
2735    let balance = value.as_u128()? as i128;
2736    if balance <= 0 {
2737        return None;
2738    }
2739
2740    Some((key.owner, *coin_type, balance))
2741}
2742
2743#[cfg(test)]
2744mod tests {
2745    use super::*;
2746    use std::sync::atomic::AtomicU64;
2747
2748    /// Every column family opened via `open_with_index_options` must have the
2749    /// `disable_write_throttling` override applied. The typed-store derive
2750    /// macro silently falls back to bare `default_db_options()` for any CF
2751    /// missing from `tables_db_options_override`, reverting to RocksDB's
2752    /// default stall triggers (slowdown=20, stop=36) — small enough that ~80
2753    /// L0 files would stop writes entirely. RocksDB persists the effective
2754    /// per-CF options to an `OPTIONS-NNNNNN` file at open; parse it to verify
2755    /// every CF received the override.
2756    #[tokio::test]
2757    async fn open_with_index_options_overrides_every_cf() {
2758        let temp_dir = tempfile::tempdir().unwrap();
2759        let db_path = temp_dir.path().join("rpc-index");
2760
2761        let _tables =
2762            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2763
2764        // Iterate the CFs RocksDB actually wrote to the OPTIONS file rather
2765        // than the schema-declared set, since deprecated CFs are dropped at
2766        // open time and never appear in OPTIONS. RocksDB always writes a
2767        // `default` CF; we exclude it because typed-store doesn't store data
2768        // there and we don't configure it.
2769        let per_cf = parse_cf_options(&db_path);
2770        assert!(
2771            !per_cf.is_empty(),
2772            "expected at least one CFOptions section in OPTIONS file"
2773        );
2774        for (cf_name, opts) in &per_cf {
2775            if cf_name == "default" {
2776                continue;
2777            }
2778            for (key, expected) in [
2779                ("level0_slowdown_writes_trigger", "512"),
2780                ("level0_stop_writes_trigger", "1024"),
2781                ("soft_pending_compaction_bytes_limit", "0"),
2782                ("hard_pending_compaction_bytes_limit", "0"),
2783            ] {
2784                let actual = opts
2785                    .get(key)
2786                    .unwrap_or_else(|| panic!("cf `{cf_name}` missing `{key}`"));
2787                assert_eq!(
2788                    actual, expected,
2789                    "cf `{cf_name}` has `{key}={actual}`, expected `{expected}` — \
2790                     the typed-store override map likely doesn't cover this CF"
2791                );
2792            }
2793        }
2794    }
2795
2796    #[test]
2797    fn checked_encode_event_seq_rejects_unrepresentable_values() {
2798        assert!(
2799            checked_encode_event_seq(0, MAX_EVENTS_PER_TX).is_err(),
2800            "event_idx at MAX_EVENTS_PER_TX must be rejected"
2801        );
2802        assert!(
2803            checked_encode_event_seq(MAX_TX_SEQ + 1, 0).is_err(),
2804            "tx_seq past MAX_TX_SEQ must be rejected"
2805        );
2806    }
2807
2808    /// Compaction filter math: tx-bitmap whole-bucket removability around
2809    /// the tx == 0 boundary. With the pruning atomic at 1, only tx_seq 0
2810    /// is gone. Bucket 0 spans tx_seqs [0, 65_536), so it is NOT entirely
2811    /// pruned and must be kept. This is exactly the off-by-one case the
2812    /// exclusive floor is supposed to make explicit.
2813    #[test]
2814    fn bitmap_filter_keeps_bucket_with_live_tx_above_zero_watermark() {
2815        let watermark = Arc::new(AtomicU64::new(1));
2816        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2817
2818        let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2819            dimension_key: vec![1, 2, 3],
2820            bucket_id: 0,
2821        });
2822        assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2823
2824        // Once the watermark advances to TX_BUCKET_SIZE, bucket 0 becomes
2825        // fully prunable (highest tx in bucket 0 is 65_535, exclusive of
2826        // 65_536 means the next bucket starts there — everything below is
2827        // pruned).
2828        watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
2829        assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2830    }
2831
2832    /// Compaction filter math: event-bitmap removability uses
2833    /// `event_seq_lo(pruned_exclusive)` as the threshold, so its math is
2834    /// scaled by EVENT_BITS relative to tx-bitmap.
2835    #[test]
2836    fn bitmap_filter_event_bucket_uses_event_seq_lo() {
2837        let watermark = Arc::new(AtomicU64::new(0));
2838        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Event);
2839
2840        let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2841            dimension_key: vec![5],
2842            bucket_id: 0,
2843        });
2844        // Watermark 0: nothing pruned → keep.
2845        assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2846
2847        // The highest tx whose event_seq can fall in bucket 0 is
2848        // (EVENT_BUCKET_SIZE / MAX_EVENTS_PER_TX) - 1. Need watermark to
2849        // exceed that for bucket 0 to be fully prunable.
2850        let txs_per_bucket = EVENT_BUCKET_SIZE / MAX_EVENTS_PER_TX as u64;
2851        watermark.store(txs_per_bucket, Ordering::Relaxed);
2852        assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2853    }
2854
2855    /// Malformed keys must never be silently `Remove`d — silent data loss
2856    /// is much worse than a stuck row. A too-short key and a key with a
2857    /// bucket_id that would overflow the bucket-hi computation should
2858    /// both be kept.
2859    #[test]
2860    fn bitmap_filter_keeps_malformed_keys() {
2861        let watermark = Arc::new(AtomicU64::new(u64::MAX));
2862        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2863
2864        assert!(matches!(filter.filter(b"short", &[]), Decision::Keep));
2865        assert!(matches!(filter.filter(&[], &[]), Decision::Keep));
2866
2867        // bucket_id near u64::MAX would overflow `(b+1)*TX_BUCKET_SIZE`.
2868        // The checked math returns None → keep.
2869        let huge = typed_store::be_fix_int_ser(&BitmapIndexKey {
2870            dimension_key: vec![],
2871            bucket_id: u64::MAX - 1,
2872        });
2873        assert!(huge.len() >= 8);
2874        assert!(matches!(filter.filter(&huge, &[]), Decision::Keep));
2875    }
2876
2877    /// Round-trip the typed-store encoding: a `BitmapIndexKey` encoded via
2878    /// `be_fix_int_ser` ends with the bucket_id as 8 big-endian bytes that
2879    /// the compaction filter reads back. Guards against silent drift if
2880    /// typed-store changes its key serializer.
2881    #[test]
2882    fn bitmap_filter_decodes_typed_store_keys() {
2883        let watermark = Arc::new(AtomicU64::new(0));
2884        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2885
2886        // Build a key with a bucket_id that, after advancing the watermark
2887        // far enough, would be removable. Confirm the filter agrees.
2888        let bucket_id = 7u64;
2889        let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2890            dimension_key: vec![0xAA, 0xBB, 0xCC],
2891            bucket_id,
2892        });
2893        // First, with watermark = 0, definitely keep.
2894        assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2895
2896        // Advance watermark past (bucket_id + 1) * TX_BUCKET_SIZE → remove.
2897        watermark.store((bucket_id + 1) * TX_BUCKET_SIZE, Ordering::Relaxed);
2898        assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2899    }
2900
2901    /// The merge operator must OR multiple operands into a single bitmap —
2902    /// not last-write-wins, which is what we'd get without it.
2903    #[test]
2904    fn bitmap_merge_operator_unions_operands() {
2905        let mut bm_a = RoaringBitmap::new();
2906        bm_a.insert(1);
2907        bm_a.insert(5);
2908        let blob_a = encode_bitmap_blob(&bm_a);
2909
2910        let mut bm_b = RoaringBitmap::new();
2911        bm_b.insert(5);
2912        bm_b.insert(7);
2913        let blob_b = encode_bitmap_blob(&bm_b);
2914
2915        let mut bm_c = RoaringBitmap::new();
2916        bm_c.insert(100);
2917        let blob_c = encode_bitmap_blob(&bm_c);
2918
2919        // Simulate rocksdb feeding [blob_b, blob_c] as operands with no
2920        // existing on-disk value (which is what happens on first merge into
2921        // a new key).
2922        //
2923        // We can't easily construct a `MergeOperands` from outside rocksdb,
2924        // so test the decode/encode round-trip via the helpers directly and
2925        // assert the union over decoded bitmaps. This validates the data
2926        // path the merge operator depends on; the operator's loop is a
2927        // trivial `acc |= bm` on top.
2928        let decoded_a = decode_bitmap_blob(&blob_a).expect("decode a");
2929        let decoded_b = decode_bitmap_blob(&blob_b).expect("decode b");
2930        let decoded_c = decode_bitmap_blob(&blob_c).expect("decode c");
2931        let unioned = decoded_a | decoded_b | decoded_c;
2932        let mut expected = RoaringBitmap::new();
2933        for b in [1, 5, 7, 100] {
2934            expected.insert(b);
2935        }
2936        assert_eq!(unioned, expected);
2937    }
2938
2939    /// End-to-end: write merge operands across multiple "checkpoints" into a
2940    /// real DBMap and confirm the merge operator unions them when read back.
2941    /// This is the only test that exercises the in-rocksdb merge path,
2942    /// since `bitmap_merge_operator_unions_operands` only round-trips the
2943    /// helpers.
2944    #[tokio::test]
2945    async fn bitmap_merge_operator_unions_across_writes() {
2946        let temp_dir = tempfile::tempdir().unwrap();
2947        let db_path = temp_dir.path().join("rpc-index");
2948        let tables =
2949            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2950
2951        let key = BitmapIndexKey {
2952            dimension_key: vec![1, 2, 3],
2953            bucket_id: 0,
2954        };
2955
2956        // Write three merge operands targeting the same key. Without the
2957        // merge operator, the last write would clobber the first two; with
2958        // it, all bits should be present.
2959        for bits in [vec![1u32, 2], vec![3, 4], vec![5, 6, 7]] {
2960            let mut bm = RoaringBitmap::new();
2961            for b in bits {
2962                bm.insert(b);
2963            }
2964            let mut batch = tables.transaction_bitmap.batch();
2965            batch
2966                .partial_merge_batch(
2967                    &tables.transaction_bitmap,
2968                    [(key.clone(), BitmapBlob::from(bm))],
2969                )
2970                .unwrap();
2971            batch.write().unwrap();
2972        }
2973
2974        let blob = tables
2975            .transaction_bitmap
2976            .get(&key)
2977            .unwrap()
2978            .expect("merged row should exist");
2979        let bm = RoaringBitmap::deserialize_from(&blob.0[..]).unwrap();
2980        let got: Vec<u32> = bm.iter().collect();
2981        assert_eq!(got, vec![1, 2, 3, 4, 5, 6, 7]);
2982    }
2983
2984    /// Whole-bucket compaction-filter removal: write bits to buckets 0 and
2985    /// 1, advance the pruning watermark past bucket 0 only, force a
2986    /// compaction, then assert bucket 0 is gone and bucket 1 survives.
2987    #[tokio::test]
2988    async fn bitmap_filter_removes_whole_bucket_after_compaction() {
2989        let temp_dir = tempfile::tempdir().unwrap();
2990        let db_path = temp_dir.path().join("rpc-index");
2991
2992        let watermark = Arc::new(AtomicU64::new(0));
2993        let index_options = IndexStoreOptions {
2994            pruning_tx_seq_exclusive: watermark.clone(),
2995        };
2996        let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
2997
2998        let dim_key = vec![0x01, 0xAA];
2999        let k0 = BitmapIndexKey {
3000            dimension_key: dim_key.clone(),
3001            bucket_id: 0,
3002        };
3003        let k1 = BitmapIndexKey {
3004            dimension_key: dim_key.clone(),
3005            bucket_id: 1,
3006        };
3007        let mut bm = RoaringBitmap::new();
3008        bm.insert(0);
3009
3010        // Use a direct insert rather than a merge so we exercise the
3011        // compaction filter on a regular value. (Merge interaction is
3012        // covered by `bitmap_merge_operator_unions_across_writes`.)
3013        let blob = BitmapBlob::from(bm);
3014        let mut batch = tables.transaction_bitmap.batch();
3015        batch
3016            .insert_batch(
3017                &tables.transaction_bitmap,
3018                [(k0.clone(), blob.clone()), (k1.clone(), blob)],
3019            )
3020            .unwrap();
3021        batch.write().unwrap();
3022        tables.transaction_bitmap.flush().unwrap();
3023
3024        // Sanity-check: both buckets are present before advancing the
3025        // watermark.
3026        assert!(tables.transaction_bitmap.get(&k0).unwrap().is_some());
3027        assert!(tables.transaction_bitmap.get(&k1).unwrap().is_some());
3028
3029        // Advance watermark past bucket 0 but not past bucket 1.
3030        watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
3031
3032        // Compact the entire keyspace with raw byte bounds to ensure we
3033        // cover every encoded BitmapIndexKey, regardless of typed-store's
3034        // length-prefix width.
3035        tables
3036            .transaction_bitmap
3037            .compact_range_raw("transaction_bitmap", vec![], vec![0xFF; 128])
3038            .unwrap();
3039
3040        assert!(
3041            tables.transaction_bitmap.get(&k0).unwrap().is_none(),
3042            "bucket 0 should have been removed by the compaction filter"
3043        );
3044        assert!(
3045            tables.transaction_bitmap.get(&k1).unwrap().is_some(),
3046            "bucket 1 should still be present (only bucket 0 was below the watermark)"
3047        );
3048    }
3049
3050    /// The retained backfill range must surface missing cps as an error,
3051    /// not silently leave a permanent hole. This tests the
3052    /// `ok_or_else(...)` conversion isolated from the full init path.
3053    #[test]
3054    fn backfill_missing_cp_in_retained_range_is_error() {
3055        // Mirror the backfill closure: take the
3056        // `Result<Option<Checkpoint>>` from the loader, and require
3057        // `Some(checkpoint)` for every cp in the retained range.
3058        let checkpoint_range = 5u64..=10u64;
3059        let seq = 7u64;
3060        let loaded: Result<Option<Checkpoint>, StorageError> = Ok(None);
3061
3062        let result: Result<(), StorageError> = (|| {
3063            let checkpoint = loaded?.ok_or_else(|| {
3064                StorageError::missing(format!(
3065                    "ledger history backfill: checkpoint {seq} is missing from local storage \
3066                     but falls inside the retained backfill range {checkpoint_range:?}"
3067                ))
3068            })?;
3069            let _ = checkpoint;
3070            Ok(())
3071        })();
3072
3073        let err = result.expect_err("missing cp must error out, not silently succeed");
3074        let msg = err.to_string();
3075        assert!(
3076            msg.contains(&format!("checkpoint {seq}")),
3077            "error should name the missing cp: {msg}"
3078        );
3079        assert!(
3080            msg.contains("5..=10"),
3081            "error should name the retained range: {msg}"
3082        );
3083    }
3084
3085    /// Existing watermark rows on disk encode `Indexed` and `Pruned` as serde
3086    /// indexes 0 and 1. Reordering the enum (or inserting a variant before
3087    /// them) would shift those discriminants and silently misread on-disk
3088    /// rows. Hardcode the legacy bytes and confirm they still round-trip.
3089    #[test]
3090    fn legacy_watermark_bytes_still_deserialize() {
3091        // BCS encodes a unit enum variant as a ULEB128 of its index.
3092        let indexed_bytes = bcs::to_bytes(&Watermark::Indexed).unwrap();
3093        let pruned_bytes = bcs::to_bytes(&Watermark::Pruned).unwrap();
3094        assert_eq!(
3095            indexed_bytes,
3096            vec![0],
3097            "Watermark::Indexed must encode as 0"
3098        );
3099        assert_eq!(pruned_bytes, vec![1], "Watermark::Pruned must encode as 1");
3100
3101        // Feed the canonical legacy bytes to the deserializer and confirm
3102        // they still arrive at the right variants. This is the test that
3103        // would catch an accidental reorder of the enum.
3104        let decoded_indexed: Watermark = bcs::from_bytes(&[0]).unwrap();
3105        let decoded_pruned: Watermark = bcs::from_bytes(&[1]).unwrap();
3106        assert!(matches!(decoded_indexed, Watermark::Indexed));
3107        assert!(matches!(decoded_pruned, Watermark::Pruned));
3108    }
3109
3110    /// The schema version and the ledger-history feature flag are tracked
3111    /// independently: the flag round-trips through the `settings` CF, and
3112    /// *enabling* the feature forces a reinit while the schema version is
3113    /// unchanged. *Disabling* it does not (that is handled in place).
3114    #[tokio::test]
3115    async fn settings_cf_round_trips_and_drives_reinit() {
3116        let temp_dir = tempfile::tempdir().unwrap();
3117        let db_path = temp_dir.path().join("rpc-index");
3118        let tables =
3119            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3120
3121        // Stamp a current-schema DB built with ledger history disabled.
3122        tables
3123            .meta
3124            .insert(
3125                &(),
3126                &MetadataInfo {
3127                    version: CURRENT_DB_VERSION,
3128                },
3129            )
3130            .unwrap();
3131        tables
3132            .settings
3133            .insert(
3134                &(),
3135                &encode_settings(&IndexSettings {
3136                    ledger_history_indexing: false,
3137                }),
3138            )
3139            .unwrap();
3140        assert!(!tables.persisted_ledger_history_indexing());
3141
3142        // An empty checkpoint store keeps the watermark check out of the way so
3143        // we isolate the schema-version / feature-toggle logic.
3144        let checkpoint_store = CheckpointStore::new_for_tests();
3145
3146        // Same schema, same feature setting => no reinit.
3147        assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3148        // Same schema, but the feature was toggled on => reinit.
3149        assert!(tables.needs_to_do_initialization(&checkpoint_store, true));
3150
3151        // Flip the persisted flag; it round-trips.
3152        tables
3153            .settings
3154            .insert(
3155                &(),
3156                &encode_settings(&IndexSettings {
3157                    ledger_history_indexing: true,
3158                }),
3159            )
3160            .unwrap();
3161        assert!(tables.persisted_ledger_history_indexing());
3162        // Already enabled => no reinit.
3163        assert!(!tables.needs_to_do_initialization(&checkpoint_store, true));
3164        // Disabling does NOT force a rebuild — the history CFs are dropped in
3165        // place instead.
3166        assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3167    }
3168
3169    /// `disable_ledger_history_indexing` empties every ledger-history CF (not
3170    /// all-but-the-last, the trap with end-exclusive range deletes) and clears
3171    /// the persisted flag, while leaving the base indexes untouched.
3172    #[tokio::test]
3173    async fn disable_ledger_history_indexing_drops_history_cfs_in_place() {
3174        let temp_dir = tempfile::tempdir().unwrap();
3175        let db_path = temp_dir.path().join("rpc-index");
3176        let tables =
3177            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3178
3179        // Seed a ledger-history DB with several rows in each history CF.
3180        tables
3181            .settings
3182            .insert(
3183                &(),
3184                &encode_settings(&IndexSettings {
3185                    ledger_history_indexing: true,
3186                }),
3187            )
3188            .unwrap();
3189        for tx_seq in 0..5u64 {
3190            tables
3191                .tx_seq_digest
3192                .insert(
3193                    &tx_seq,
3194                    &TxSeqDigestInfo {
3195                        digest: TransactionDigest::new([0; 32]),
3196                        event_count: 0,
3197                        tx_offset: 0,
3198                        checkpoint_number: 0,
3199                    },
3200                )
3201                .unwrap();
3202        }
3203        for bucket_id in 0..5u64 {
3204            let key = BitmapIndexKey {
3205                dimension_key: vec![1, 2, 3],
3206                bucket_id,
3207            };
3208            tables
3209                .transaction_bitmap
3210                .insert(&key, &BitmapBlob(vec![0xab]))
3211                .unwrap();
3212            tables
3213                .event_bitmap
3214                .insert(&key, &BitmapBlob(vec![0xcd]))
3215                .unwrap();
3216        }
3217        // A non-history CF that must survive the drop.
3218        tables.watermark.insert(&Watermark::Indexed, &42).unwrap();
3219
3220        tables.disable_ledger_history_indexing().unwrap();
3221
3222        // Every history row is gone — including the last key in each CF.
3223        assert!(tables.tx_seq_digest.is_empty());
3224        assert!(tables.transaction_bitmap.is_empty());
3225        assert!(tables.event_bitmap.is_empty());
3226        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), None);
3227
3228        // The flag is cleared and the base index is untouched.
3229        assert!(!tables.persisted_ledger_history_indexing());
3230        assert_eq!(tables.watermark.get(&Watermark::Indexed).unwrap(), Some(42));
3231    }
3232
3233    /// `prune()` advances `Watermark::Pruned` and deletes the exact
3234    /// tx_seq_digest range below the new tx-seq floor when enabled.
3235    #[tokio::test]
3236    async fn prune_maintains_ledger_history_state_when_active() {
3237        let temp_dir = tempfile::tempdir().unwrap();
3238        let db_path = temp_dir.path().join("rpc-index");
3239        let tables =
3240            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3241
3242        // Seed rows to simulate an indexed ledger history subsystem.
3243        let mut batch = tables.tx_seq_digest.batch();
3244        for tx_seq in 0..5u64 {
3245            batch
3246                .insert_batch(
3247                    &tables.tx_seq_digest,
3248                    [(
3249                        tx_seq,
3250                        TxSeqDigestInfo {
3251                            digest: TransactionDigest::new([0; 32]),
3252                            event_count: 0,
3253                            tx_offset: 0,
3254                            checkpoint_number: 0,
3255                        },
3256                    )],
3257                )
3258                .unwrap();
3259        }
3260        batch.write().unwrap();
3261
3262        // Prune cp 1 with an absolute tx-seq floor of 3 → rows 0..3 should
3263        // be deleted, the derived floor advances from 0 to 3.
3264        let pruning_atomic = AtomicU64::new(0);
3265        tables
3266            .prune(1, 3, /*ledger_history_enabled=*/ true, &pruning_atomic)
3267            .unwrap();
3268        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3269
3270        assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3271        for tx_seq in 0..3u64 {
3272            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3273        }
3274        for tx_seq in 3..5u64 {
3275            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3276        }
3277        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3278    }
3279
3280    /// When disabled, `prune` must advance only `Watermark::Pruned`.
3281    #[tokio::test]
3282    async fn prune_skips_ledger_history_state_when_inactive() {
3283        let temp_dir = tempfile::tempdir().unwrap();
3284        let db_path = temp_dir.path().join("rpc-index");
3285        let tables =
3286            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3287
3288        // Seed a tx_seq_digest row so we can confirm prune leaves it alone.
3289        tables
3290            .tx_seq_digest
3291            .insert(
3292                &0u64,
3293                &TxSeqDigestInfo {
3294                    digest: TransactionDigest::new([0; 32]),
3295                    event_count: 0,
3296                    tx_offset: 0,
3297                    checkpoint_number: 0,
3298                },
3299            )
3300            .unwrap();
3301
3302        let pruning_atomic = AtomicU64::new(0);
3303        tables
3304            .prune(
3305                5,
3306                3,
3307                /*ledger_history_enabled=*/ false,
3308                &pruning_atomic,
3309            )
3310            .unwrap();
3311        assert_eq!(
3312            pruning_atomic.load(Ordering::Relaxed),
3313            0,
3314            "disabled prune must not advance the compaction-filter atomic"
3315        );
3316        assert_eq!(
3317            tables.watermark.get(&Watermark::Pruned).unwrap(),
3318            Some(5),
3319            "base pruning must still advance"
3320        );
3321        assert!(
3322            tables.tx_seq_digest.get(&0u64).unwrap().is_some(),
3323            "tx_seq_digest rows must remain untouched when inactive"
3324        );
3325    }
3326
3327    /// Forward `index_checkpoint` writes ledger history rows only when enabled.
3328    #[tokio::test]
3329    async fn index_checkpoint_gates_on_ledger_history_enabled() {
3330        use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3331
3332        let temp_dir = tempfile::tempdir().unwrap();
3333        let db_path = temp_dir.path().join("rpc-index");
3334        let tables =
3335            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3336
3337        // Non-zero cp seq to skip the genesis path in `index_epoch` (which
3338        // expects a real system state object).
3339        let checkpoint = TestCheckpointBuilder::new(1)
3340            .start_transaction(1)
3341            .finish_transaction()
3342            .build_checkpoint();
3343
3344        // Disabled: no ledger history writes.
3345        let batch = tables
3346            .index_checkpoint(&checkpoint, /*ledger_history_enabled=*/ false)
3347            .expect("index_checkpoint failed");
3348        batch.write().expect("batch write failed");
3349        assert_eq!(tables.tx_seq_digest.safe_iter().count(), 0);
3350        assert_eq!(tables.transaction_bitmap.safe_iter().count(), 0);
3351        assert_eq!(tables.event_bitmap.safe_iter().count(), 0);
3352
3353        // Enabled → forward writes land.
3354        let checkpoint2 = TestCheckpointBuilder::new(2)
3355            .start_transaction(1)
3356            .finish_transaction()
3357            .build_checkpoint();
3358        let batch = tables
3359            .index_checkpoint(&checkpoint2, /*ledger_history_enabled=*/ true)
3360            .expect("index_checkpoint failed");
3361        batch.write().expect("batch write failed");
3362        assert!(
3363            tables.tx_seq_digest.safe_iter().count() > 0,
3364            "tx_seq_digest must have rows when ledger_history_enabled=true"
3365        );
3366    }
3367
3368    /// `tx_offset` records each transaction's zero-based position *within its
3369    /// checkpoint*, not its global `tx_sequence_number`, and restarts at 0 on
3370    /// every checkpoint boundary.
3371    #[tokio::test]
3372    async fn index_checkpoint_records_within_checkpoint_tx_offset() {
3373        use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3374
3375        let temp_dir = tempfile::tempdir().unwrap();
3376        let db_path = temp_dir.path().join("rpc-index");
3377        let tables =
3378            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3379
3380        // Checkpoint 1: three txs over a non-zero tx-seq base (100), so the
3381        // global tx_seqs are 100,101,102 — distinct from the offsets 0,1,2.
3382        let checkpoint1 = TestCheckpointBuilder::new(1)
3383            .with_network_total_transactions(100)
3384            .start_transaction(0)
3385            .finish_transaction()
3386            .start_transaction(1)
3387            .finish_transaction()
3388            .start_transaction(2)
3389            .finish_transaction()
3390            .build_checkpoint();
3391        tables
3392            .index_checkpoint(&checkpoint1, true)
3393            .expect("index_checkpoint failed")
3394            .write()
3395            .expect("batch write failed");
3396
3397        // Checkpoint 2: two more txs; global tx_seqs continue at 103,104 but the
3398        // offsets restart at 0.
3399        let checkpoint2 = TestCheckpointBuilder::new(2)
3400            .with_network_total_transactions(103)
3401            .start_transaction(0)
3402            .finish_transaction()
3403            .start_transaction(1)
3404            .finish_transaction()
3405            .build_checkpoint();
3406        tables
3407            .index_checkpoint(&checkpoint2, true)
3408            .expect("index_checkpoint failed")
3409            .write()
3410            .expect("batch write failed");
3411
3412        let offset_by_tx_seq: std::collections::BTreeMap<u64, u32> = tables
3413            .tx_seq_digest
3414            .safe_iter()
3415            .map(|row| row.map(|(tx_seq, info)| (tx_seq, info.tx_offset)))
3416            .collect::<Result<_, _>>()
3417            .unwrap();
3418
3419        assert_eq!(
3420            offset_by_tx_seq,
3421            std::collections::BTreeMap::from([(100, 0), (101, 1), (102, 2), (103, 0), (104, 1),]),
3422            "tx_offset must be the within-checkpoint position, not the global tx_seq"
3423        );
3424    }
3425
3426    /// `prune()` commits the tx_seq_digest range delete and the
3427    /// `Watermark::Pruned` advance in one atomic batch. After a successful
3428    /// prune both halves are present: no tx_seq_digest row remains in the
3429    /// deleted range, AND `Watermark::Pruned` is at the new value.
3430    #[tokio::test]
3431    async fn prune_commits_deletes_and_watermark_atomically() {
3432        let temp_dir = tempfile::tempdir().unwrap();
3433        let db_path = temp_dir.path().join("rpc-index");
3434        let tables =
3435            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3436
3437        let mut batch = tables.tx_seq_digest.batch();
3438        for tx_seq in 0..4u64 {
3439            batch
3440                .insert_batch(
3441                    &tables.tx_seq_digest,
3442                    [(
3443                        tx_seq,
3444                        TxSeqDigestInfo {
3445                            digest: TransactionDigest::new([0; 32]),
3446                            event_count: 0,
3447                            tx_offset: 0,
3448                            checkpoint_number: 0,
3449                        },
3450                    )],
3451                )
3452                .unwrap();
3453        }
3454        batch.write().unwrap();
3455
3456        let pruning_atomic = AtomicU64::new(0);
3457        tables
3458            .prune(1, 2, /*ledger_history_enabled=*/ true, &pruning_atomic)
3459            .unwrap();
3460
3461        // After the single atomic batch lands, both halves are present: the
3462        // deleted rows AND the advanced `Watermark::Pruned`.
3463        assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3464        for tx_seq in 0..2u64 {
3465            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3466        }
3467        for tx_seq in 2..4u64 {
3468            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3469        }
3470        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(2));
3471        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 2);
3472    }
3473
3474    /// Replaying a prune with an unchanged floor is a no-op: the second call
3475    /// does not advance the compaction-filter atomic and leaves rows +
3476    /// watermark untouched. Covers crash-replay where the pruner re-issues the
3477    /// same prune.
3478    #[tokio::test]
3479    async fn prune_idempotent_replay() {
3480        let temp_dir = tempfile::tempdir().unwrap();
3481        let db_path = temp_dir.path().join("rpc-index");
3482        let tables =
3483            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3484
3485        let mut batch = tables.tx_seq_digest.batch();
3486        for tx_seq in 0..5u64 {
3487            batch
3488                .insert_batch(
3489                    &tables.tx_seq_digest,
3490                    [(
3491                        tx_seq,
3492                        TxSeqDigestInfo {
3493                            digest: TransactionDigest::new([0; 32]),
3494                            event_count: 0,
3495                            tx_offset: 0,
3496                            checkpoint_number: 0,
3497                        },
3498                    )],
3499                )
3500                .unwrap();
3501        }
3502        batch.write().unwrap();
3503
3504        let pruning_atomic = AtomicU64::new(0);
3505        tables.prune(1, 3, true, &pruning_atomic).unwrap();
3506        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3507        // Same floor again: nothing new to delete, floor already at 3.
3508        tables.prune(1, 3, true, &pruning_atomic).unwrap();
3509        assert_eq!(
3510            pruning_atomic.load(Ordering::Relaxed),
3511            3,
3512            "replay must not move the atomic"
3513        );
3514        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3515        for tx_seq in 3..5u64 {
3516            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3517        }
3518    }
3519
3520    /// Consecutive prunes advance the floor across an existing range tombstone:
3521    /// the second prune derives `prev_exclusive` from `first_tx_seq_digest_key`,
3522    /// which must read *through* the first tombstone (only possible because the
3523    /// CF is opened with `ignore_range_deletions = false`).
3524    #[tokio::test]
3525    async fn prune_consecutive_ranges_advance_floor() {
3526        let temp_dir = tempfile::tempdir().unwrap();
3527        let db_path = temp_dir.path().join("rpc-index");
3528        let tables =
3529            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3530
3531        let mut batch = tables.tx_seq_digest.batch();
3532        for tx_seq in 0..6u64 {
3533            batch
3534                .insert_batch(
3535                    &tables.tx_seq_digest,
3536                    [(
3537                        tx_seq,
3538                        TxSeqDigestInfo {
3539                            digest: TransactionDigest::new([0; 32]),
3540                            event_count: 0,
3541                            tx_offset: 0,
3542                            checkpoint_number: 0,
3543                        },
3544                    )],
3545                )
3546                .unwrap();
3547        }
3548        batch.write().unwrap();
3549
3550        let pruning_atomic = AtomicU64::new(0);
3551        tables.prune(1, 3, true, &pruning_atomic).unwrap();
3552        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3553        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3554        // Second prune must see floor 3 (through the tombstone) and extend it to 5.
3555        tables.prune(2, 5, true, &pruning_atomic).unwrap();
3556        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 5);
3557        for tx_seq in 0..5u64 {
3558            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3559        }
3560        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(5));
3561    }
3562
3563    /// `new_without_init` must honor an already-built ledger history DB.
3564    #[tokio::test]
3565    async fn new_without_init_enables_ledger_history_for_db_with_ledger_history_setting() {
3566        let temp_dir = tempfile::tempdir().unwrap();
3567        let db_path = temp_dir.path().join("rpc-index");
3568
3569        // Seed a DB built with ledger history indexing.
3570        {
3571            let tables =
3572                IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3573            tables
3574                .meta
3575                .insert(
3576                    &(),
3577                    &MetadataInfo {
3578                        version: CURRENT_DB_VERSION,
3579                    },
3580                )
3581                .unwrap();
3582            tables
3583                .settings
3584                .insert(
3585                    &(),
3586                    &encode_settings(&IndexSettings {
3587                        ledger_history_indexing: true,
3588                    }),
3589                )
3590                .unwrap();
3591            let mut batch = tables.tx_seq_digest.batch();
3592            for tx_seq in 100..105u64 {
3593                batch
3594                    .insert_batch(
3595                        &tables.tx_seq_digest,
3596                        [(
3597                            tx_seq,
3598                            TxSeqDigestInfo {
3599                                digest: TransactionDigest::new([0; 32]),
3600                                event_count: 0,
3601                                tx_offset: 0,
3602                                checkpoint_number: 0,
3603                            },
3604                        )],
3605                    )
3606                    .unwrap();
3607            }
3608            batch.write().unwrap();
3609        }
3610
3611        let store = RpcIndexStore::new_without_init(temp_dir.path());
3612        assert!(
3613            store.ledger_history_enabled,
3614            "new_without_init on a ledger-history DB must enable ledger history indexing"
3615        );
3616        let atomic = &store.ledger_history_pruning_watermark;
3617        assert_eq!(
3618            atomic.load(Ordering::Relaxed),
3619            100,
3620            "pruning atomic must be hydrated from the first tx_seq_digest key"
3621        );
3622
3623        // Pruning through tx-seq floor 103 deletes rows [100, 103).
3624        store.prune(7, 103).unwrap();
3625
3626        for tx_seq in 100..103u64 {
3627            assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3628        }
3629        for tx_seq in 103..105u64 {
3630            assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3631        }
3632        assert_eq!(
3633            store.tables.first_tx_seq_digest_key().unwrap(),
3634            Some(103),
3635            "prune must advance the derived tx-seq floor"
3636        );
3637        assert_eq!(
3638            atomic.load(Ordering::Relaxed),
3639            103,
3640            "prune must advance the compaction-filter atomic"
3641        );
3642    }
3643
3644    /// A DB without the ledger-history setting stays disabled in `new_without_init`.
3645    #[tokio::test]
3646    async fn new_without_init_disables_ledger_history_for_db_without_ledger_history_setting() {
3647        // Case 1: fresh/empty DB.
3648        let temp_dir = tempfile::tempdir().unwrap();
3649        let store = RpcIndexStore::new_without_init(temp_dir.path());
3650        assert!(
3651            !store.ledger_history_enabled,
3652            "new_without_init on a fresh DB must leave ledger history indexing disabled"
3653        );
3654
3655        store.prune(5, 0).unwrap();
3656        assert_eq!(
3657            store.tables.watermark.get(&Watermark::Pruned).unwrap(),
3658            Some(5)
3659        );
3660        assert_eq!(
3661            store.tables.first_tx_seq_digest_key().unwrap(),
3662            None,
3663            "disabled ledger history indexing must leave tx_seq_digest untouched"
3664        );
3665
3666        // Case 2: a current-schema DB with no `settings` row (e.g. a pre-feature
3667        // DB). The missing row must read as ledger history disabled.
3668        let temp_dir = tempfile::tempdir().unwrap();
3669        let db_path = temp_dir.path().join("rpc-index");
3670        {
3671            let tables =
3672                IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3673            tables
3674                .meta
3675                .insert(
3676                    &(),
3677                    &MetadataInfo {
3678                        version: CURRENT_DB_VERSION,
3679                    },
3680                )
3681                .unwrap();
3682        }
3683        let store = RpcIndexStore::new_without_init(temp_dir.path());
3684        assert!(
3685            !store.ledger_history_enabled,
3686            "new_without_init on a DB with no settings row must leave ledger history indexing disabled"
3687        );
3688    }
3689
3690    /// Parse the newest `OPTIONS-NNNNNN` file in `db_path` into a map keyed by
3691    /// column family name. RocksDB writes one such file on every open with a
3692    /// section per CF in INI-like format.
3693    fn parse_cf_options(db_path: &Path) -> HashMap<String, HashMap<String, String>> {
3694        let mut options_file: Option<(u64, PathBuf)> = None;
3695        for entry in std::fs::read_dir(db_path).expect("read_dir failed") {
3696            let entry = entry.unwrap();
3697            let name = entry.file_name().to_string_lossy().into_owned();
3698            let Some(rest) = name.strip_prefix("OPTIONS-") else {
3699                continue;
3700            };
3701            // Skip transient files like `OPTIONS-NNNNNN.dbtmp`.
3702            let Ok(seq) = rest.parse::<u64>() else {
3703                continue;
3704            };
3705            if options_file.as_ref().is_none_or(|(s, _)| seq > *s) {
3706                options_file = Some((seq, entry.path()));
3707            }
3708        }
3709        let (_, path) = options_file.expect("no OPTIONS-* file written");
3710        let content = std::fs::read_to_string(&path).expect("read OPTIONS failed");
3711
3712        let mut result: HashMap<String, HashMap<String, String>> = HashMap::new();
3713        let mut current_cf: Option<String> = None;
3714        for line in content.lines() {
3715            let line = line.trim();
3716            if let Some(rest) = line.strip_prefix("[CFOptions \"") {
3717                let cf_name = rest.trim_end_matches("\"]").to_string();
3718                current_cf = Some(cf_name);
3719            } else if line.starts_with('[') {
3720                // Any other section ends the CFOptions block.
3721                current_cf = None;
3722            } else if let Some(cf) = current_cf.as_ref()
3723                && let Some((k, v)) = line.split_once('=')
3724            {
3725                result
3726                    .entry(cf.clone())
3727                    .or_default()
3728                    .insert(k.trim().to_string(), v.trim().to_string());
3729            }
3730        }
3731        result
3732    }
3733}