sui_core/
rpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use roaring::RoaringBitmap;
15use rustc_hash::{FxHashMap, FxHashSet};
16use serde::Deserialize;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use std::collections::{BTreeMap, HashMap};
20use std::path::Path;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::sync::Mutex;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use std::time::Duration;
27use std::time::Instant;
28use sui_inverted_index::encode_dimension_key;
29use sui_inverted_index::for_each_event_dimension;
30use sui_inverted_index::for_each_transaction_dimension;
31use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
32use sui_types::base_types::MoveObjectType;
33use sui_types::base_types::ObjectID;
34use sui_types::base_types::SequenceNumber;
35use sui_types::base_types::SuiAddress;
36use sui_types::coin::Coin;
37use sui_types::committee::EpochId;
38use sui_types::digests::TransactionDigest;
39use sui_types::effects::TransactionEffectsAPI;
40use sui_types::full_checkpoint_content::Checkpoint;
41use sui_types::full_checkpoint_content::ExecutedTransaction;
42use sui_types::full_checkpoint_content::ObjectSet;
43use sui_types::messages_checkpoint::CheckpointSequenceNumber;
44use sui_types::object::Data;
45use sui_types::object::Object;
46use sui_types::object::Owner;
47use sui_types::storage::BackingPackageStore;
48use sui_types::storage::DynamicFieldKey;
49use sui_types::storage::EpochInfo;
50use sui_types::storage::LedgerBitmapBucket;
51use sui_types::storage::LedgerBitmapBucketIterator;
52use sui_types::storage::LedgerTxSeqDigest;
53use sui_types::storage::LedgerTxSeqDigestIterator;
54use sui_types::storage::ObjectKey;
55use sui_types::storage::TransactionInfo;
56use sui_types::storage::error::Error as StorageError;
57use sui_types::sui_system_state::SuiSystemStateTrait;
58use sui_types::transaction::TransactionDataAPI;
59use sysinfo::{MemoryRefreshKind, RefreshKind, System};
60use tracing::{debug, info, warn};
61use typed_store::DBMapUtils;
62use typed_store::TypedStoreError;
63use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
64use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
65use typed_store::traits::Map;
66
67const CURRENT_DB_VERSION: u64 = 4;
68
69// I tried increasing this to 100k and 1M and it didn't speed up indexing at all.
70const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
71
72// Bitmap inverted index constants
73// A change to these constants requires bumping CURRENT_DB_VERSION
74const TX_BUCKET_SIZE: u64 = 65_536;
75// 2^28: 4,096 transactions per bucket. Must match sui-kvstore's
76// `event_bitmap_index::BUCKET_SIZE` and the reader's `EVENT_BITMAP_BUCKET_SIZE`.
77const EVENT_BUCKET_SIZE: u64 = 268_435_456;
78const EVENT_BITS: u32 = 16;
79const MAX_EVENTS_PER_TX: u32 = 1 << EVENT_BITS;
80const MAX_TX_SEQ: u64 = u64::MAX >> EVENT_BITS;
81
82const _: () = assert!(TX_BUCKET_SIZE <= u32::MAX as u64);
83const _: () = assert!(EVENT_BUCKET_SIZE <= u32::MAX as u64);
84const _: () = assert!(EVENT_BITS < u32::BITS);
85const _: () = assert!(EVENT_BITS < u64::BITS);
86const _: () = assert!(MAX_EVENTS_PER_TX as u64 == 1u64 << EVENT_BITS);
87const _: () = assert!(EVENT_BUCKET_SIZE.is_multiple_of(MAX_EVENTS_PER_TX as u64));
88
89fn checked_encode_event_seq(tx_seq: u64, event_idx: u32) -> Result<u64, StorageError> {
90    if event_idx >= MAX_EVENTS_PER_TX {
91        return Err(StorageError::custom(format!(
92            "event_idx {event_idx} exceeds packed event-seq limit {}",
93            MAX_EVENTS_PER_TX - 1
94        )));
95    }
96    if tx_seq > MAX_TX_SEQ {
97        return Err(StorageError::custom(format!(
98            "tx_seq {tx_seq} exceeds packed event-seq limit {MAX_TX_SEQ}"
99        )));
100    }
101    Ok((tx_seq << EVENT_BITS) | (event_idx as u64))
102}
103
104/// Lowest packed event_seq for a given `tx_seq` (idx 0), as an `Option` so
105/// the compaction filter (and other untrusted-input callers) get a clean
106/// `None` instead of an overflowing shift when `tx_seq > MAX_TX_SEQ`.
107fn checked_event_seq_lo(tx_seq: u64) -> Option<u64> {
108    if tx_seq <= MAX_TX_SEQ {
109        Some(tx_seq << EVENT_BITS)
110    } else {
111        None
112    }
113}
114
115fn bulk_ingestion_write_options() -> WriteOptions {
116    let mut opts = WriteOptions::default();
117    opts.disable_wal(true);
118    opts
119}
120
121/// Get available memory, respecting cgroup limits in containerized environments
122fn get_available_memory() -> u64 {
123    // RefreshKind::nothing().with_memory() avoids collecting other, slower stats
124    let mut sys = System::new_with_specifics(
125        RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
126    );
127    sys.refresh_memory();
128
129    // Check if we have cgroup limits
130    if let Some(cgroup_limits) = sys.cgroup_limits() {
131        let memory_limit = cgroup_limits.total_memory;
132        // cgroup_limits.total_memory is 0 when there's no limit
133        if memory_limit > 0 {
134            debug!("Using cgroup memory limit: {} bytes", memory_limit);
135            return memory_limit;
136        }
137    }
138
139    // Fall back to system memory if no cgroup limits found
140    // sysinfo 0.35 already reports bytes (not KiB like older versions)
141    let total_memory_bytes = sys.total_memory();
142    debug!("Using system memory: {} bytes", total_memory_bytes);
143    total_memory_bytes
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147struct MetadataInfo {
148    /// Version of the Database
149    version: u64,
150}
151
152/// Per-DB rpc-index settings, persisted in their own column family so the
153/// schema `version` stays a clean monotonic number rather than carrying packed
154/// feature bits.
155///
156/// Stored as JSON inside an opaque `Vec<u8>` value (see [`encode_settings`])
157/// rather than as a bcs-encoded struct, so the settings can evolve without a
158/// schema break: every field carries `#[serde(default)]`, so a newer binary
159/// reading an older DB fills missing fields with defaults, and an older binary
160/// reading a newer DB ignores fields it doesn't know — safe in both upgrade
161/// directions. Add new flags here with `#[serde(default)]` and that property
162/// holds.
163#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
164struct IndexSettings {
165    /// Whether this DB was built with ledger-history indexing enabled.
166    #[serde(default)]
167    ledger_history_indexing: bool,
168}
169
170/// Serialize settings to the JSON bytes stored in the `settings` CF. JSON of a
171/// plain flag struct cannot fail, so this does not return a `Result`.
172fn encode_settings(settings: &IndexSettings) -> Vec<u8> {
173    serde_json::to_vec(settings).expect("IndexSettings is always JSON-serializable")
174}
175
176/// Decode `settings`-CF JSON bytes, defaulting on absent/corrupt data. With
177/// `#[serde(default)]` on every field, schema evolution never reaches the
178/// default-on-error path; only genuine corruption does.
179fn decode_settings(bytes: &[u8]) -> IndexSettings {
180    serde_json::from_slice(bytes).unwrap_or_default()
181}
182
183/// Checkpoint watermark type
184#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
185pub enum Watermark {
186    Indexed,
187    Pruned,
188}
189
190#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
191pub struct OwnerIndexKey {
192    pub owner: SuiAddress,
193
194    pub object_type: StructTag,
195
196    // If this object is coin-like (eg 0x2::coin::Coin) then this will be the balance of the coin
197    // inverted `!coin.balance` in order to force sorting of coins to be from greatest to least
198    pub inverted_balance: Option<u64>,
199
200    pub object_id: ObjectID,
201}
202
203impl OwnerIndexKey {
204    // Creates a key from the provided object.
205    // Panics if the provided object is not an Address owned object
206    fn from_object(object: &Object) -> Self {
207        let owner = match object.owner() {
208            Owner::AddressOwner(owner) => owner,
209            Owner::ConsensusAddressOwner { owner, .. } => owner,
210            _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
211        };
212        let object_type = object.struct_tag().expect("packages cannot be owned");
213
214        let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
215
216        Self {
217            owner: *owner,
218            object_type,
219            inverted_balance,
220            object_id: object.id(),
221        }
222    }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226pub struct OwnerIndexInfo {
227    // object_id and type of this object are a part of the key
228    pub version: SequenceNumber,
229}
230
231impl OwnerIndexInfo {
232    pub fn new(object: &Object) -> Self {
233        Self {
234            version: object.version(),
235        }
236    }
237}
238
239#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
240pub struct CoinIndexKey {
241    coin_type: StructTag,
242}
243
244#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
245pub struct BalanceKey {
246    pub owner: SuiAddress,
247    pub coin_type: StructTag,
248}
249
250#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
251pub struct CoinIndexInfo {
252    pub coin_metadata_object_id: Option<ObjectID>,
253    pub treasury_object_id: Option<ObjectID>,
254    pub regulated_coin_metadata_object_id: Option<ObjectID>,
255}
256
257#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
258pub struct BalanceIndexInfo {
259    pub coin_balance_delta: i128,
260    pub address_balance_delta: i128,
261}
262
263impl BalanceIndexInfo {
264    fn merge_delta(&mut self, other: &Self) {
265        self.coin_balance_delta = self
266            .coin_balance_delta
267            .saturating_add(other.coin_balance_delta);
268        self.address_balance_delta = self
269            .address_balance_delta
270            .saturating_add(other.address_balance_delta);
271    }
272}
273
274impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
275    fn from(index_info: BalanceIndexInfo) -> Self {
276        // Note: We represent balance deltas as i128 to simplify merging positive and negative updates.
277        // Be aware: Move doesn’t enforce a one-time-witness (OTW) pattern when creating a Supply<T>.
278        // Anyone can call `sui::balance::create_supply` and mint unbounded supply, potentially pushing
279        // total balances over u64::MAX. To avoid crashing the indexer, we clamp the merged value instead
280        // of panicking on overflow. This has the unfortunate consequence of making bugs in the index
281        // harder to detect, but is a necessary trade-off to avoid creating a DOS attack vector.
282        let coin_balance = index_info.coin_balance_delta.clamp(0, u64::MAX as i128) as u64;
283        let address_balance = index_info.address_balance_delta.clamp(0, u64::MAX as i128) as u64;
284        sui_types::storage::BalanceInfo {
285            coin_balance,
286            address_balance,
287        }
288    }
289}
290
291#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
292pub struct PackageVersionKey {
293    pub original_package_id: ObjectID,
294    pub version: u64,
295}
296
297#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
298pub struct PackageVersionInfo {
299    pub storage_id: ObjectID,
300}
301
302/// Row of the `tx_seq_digest` table — direct mapping from `tx_sequence_number`
303/// to `(digest, event_count, tx_offset, checkpoint_number)`. `event_count` lets
304/// event listings enumerate a transaction's event_seqs without rereading the tx
305/// row. `tx_offset` is the transaction's zero-based position within its
306/// checkpoint, letting readers report a `(checkpoint, tx_offset)` coordinate
307/// without recomputing it from the checkpoint's base sequence number.
308#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
309pub struct TxSeqDigestInfo {
310    pub digest: TransactionDigest,
311    pub event_count: u32,
312    pub tx_offset: u32,
313    pub checkpoint_number: CheckpointSequenceNumber,
314}
315
316/// Row key for both bitmap CFs.
317///
318/// `dimension_key` is `[tag_byte][value_bytes]` per `sui-inverted-index`.
319/// `bucket_id` is the integer division `seq / BUCKET_SIZE` for whichever
320/// sequence space the CF is keyed by (tx_seq for `transaction_bitmap`, packed
321/// event_seq for `event_bitmap`).
322///
323/// typed-store encodes keys with bincode `with_big_endian().with_fixint_encoding()`,
324/// so the on-disk layout is:
325///   [8 B BE length(dimension_key)] [dimension_key bytes] [8 B BE bucket_id]
326/// Within a fixed `dimension_key`, range scans over `bucket_id` are
327/// numerically ordered. The compaction filter recovers `bucket_id` from the
328/// trailing 8 bytes.
329#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
330pub struct BitmapIndexKey {
331    pub dimension_key: Vec<u8>,
332    pub bucket_id: u64,
333}
334
335/// Value stored in the bitmap CFs: the raw bytes of `RoaringBitmap::serialize_into`.
336///
337/// typed-store BCS-wraps this on disk (ULEB128 length prefix + raw bitmap
338/// bytes). The merge operator decodes operands, ORs them, and re-encodes —
339/// see `bitmap_union_merge_operator`.
340#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
341pub struct BitmapBlob(pub Vec<u8>);
342
343impl From<RoaringBitmap> for BitmapBlob {
344    fn from(bm: RoaringBitmap) -> Self {
345        let mut buf = Vec::with_capacity(bm.serialized_size());
346        bm.serialize_into(&mut buf)
347            .expect("RoaringBitmap::serialize_into on Vec cannot fail");
348        Self(buf)
349    }
350}
351
352fn ledger_tx_seq_digest(tx_sequence_number: u64, info: TxSeqDigestInfo) -> LedgerTxSeqDigest {
353    LedgerTxSeqDigest {
354        tx_sequence_number,
355        digest: info.digest,
356        event_count: info.event_count,
357        tx_offset: info.tx_offset,
358        checkpoint_number: info.checkpoint_number,
359    }
360}
361
362fn decode_ledger_bitmap_bucket(
363    key: BitmapIndexKey,
364    blob: BitmapBlob,
365) -> Result<LedgerBitmapBucket, TypedStoreError> {
366    let bitmap = RoaringBitmap::deserialize_from(&blob.0[..]).map_err(|e| {
367        TypedStoreError::SerializationError(format!("decode ledger bitmap bucket: {e}"))
368    })?;
369    Ok(LedgerBitmapBucket {
370        bucket_id: key.bucket_id,
371        bitmap,
372    })
373}
374
375/// Which sequence space a bitmap CF is keyed by. Owns the whole-bucket
376/// removability math the `BitmapCompactionFilter` applies.
377#[derive(Clone, Copy, Debug, PartialEq, Eq)]
378pub enum BitmapKind {
379    Transaction,
380    Event,
381}
382
383impl BitmapKind {
384    /// Returns true when every seq in `bucket_id` is strictly below
385    /// `pruned_tx_seq_exclusive` — i.e. the whole bucket row is safe to drop.
386    ///
387    /// Both kinds bucket by integer division, but in different sequence
388    /// spaces, while the prune watermark is always in tx-seq space:
389    /// `Transaction` buckets are tx-seq ranges, so the watermark compares
390    /// directly; `Event` buckets are packed-event-seq ranges, so the watermark
391    /// is first converted to its lowest event-seq (`checked_event_seq_lo`).
392    ///
393    /// All arithmetic is `checked_*`: `bucket_id` is untrusted input decoded
394    /// from a rocksdb key, and an overflow must not panic the compaction
395    /// thread — it conservatively returns `false` (keep) instead.
396    fn bucket_fully_pruned(self, bucket_id: u64, pruned_tx_seq_exclusive: u64) -> bool {
397        match self {
398            BitmapKind::Transaction => bucket_id
399                .checked_add(1)
400                .and_then(|b| b.checked_mul(TX_BUCKET_SIZE))
401                .map(|hi| hi <= pruned_tx_seq_exclusive)
402                .unwrap_or(false),
403            BitmapKind::Event => {
404                let bucket_hi = bucket_id
405                    .checked_add(1)
406                    .and_then(|b| b.checked_mul(EVENT_BUCKET_SIZE));
407                let threshold = checked_event_seq_lo(pruned_tx_seq_exclusive);
408                match (bucket_hi, threshold) {
409                    (Some(hi), Some(th)) => hi <= th,
410                    _ => false,
411                }
412            }
413        }
414    }
415}
416
417#[derive(Default, Clone)]
418pub struct IndexStoreOptions {
419    /// Shared exclusive tx-seq prune floor for compaction filters.
420    /// A zero floor keeps every bucket.
421    pub pruning_tx_seq_exclusive: Arc<AtomicU64>,
422}
423
424fn default_table_options() -> typed_store::rocks::DBOptions {
425    typed_store::rocks::default_db_options().disable_write_throttling()
426}
427
428/// Like `default_table_options`, but honors range-delete tombstones immediately
429/// instead of ignoring them until compaction (the rocksdb default). `tx_seq_digest`
430/// is pruned with `schedule_delete_range` and `first_tx_seq_digest_key` reads the
431/// resulting pruning floor straight back, so tombstones must be visible to reads at
432/// once or the floor would not advance until a compaction happened to run.
433fn tx_seq_digest_table_options() -> typed_store::rocks::DBOptions {
434    let mut options = default_table_options();
435    options.rw_options = options.rw_options.clone().set_ignore_range_deletions(false);
436    options
437}
438
439fn balance_delta_merge_operator(
440    _key: &[u8],
441    existing_val: Option<&[u8]>,
442    operands: &MergeOperands,
443) -> Option<Vec<u8>> {
444    let mut result = if let Some(existing_val) = existing_val {
445        bcs::from_bytes::<BalanceIndexInfo>(existing_val)
446            .inspect_err(|e| {
447                tracing::error!(
448                    "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
449                )
450            })
451            .ok()?
452    } else {
453        BalanceIndexInfo::default()
454    };
455
456    for operand in operands.iter() {
457        let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
458            .inspect_err(|e| {
459                tracing::error!(
460                    "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
461                )
462            })
463            .ok()?;
464        result.merge_delta(&delta);
465    }
466
467    Some(
468        bcs::to_bytes(&result)
469            .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
470    )
471}
472
473fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
474    let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
475        Ok(info) => info,
476        Err(_) => return Decision::Keep,
477    };
478
479    if balance_info.coin_balance_delta == 0 && balance_info.address_balance_delta == 0 {
480        Decision::Remove
481    } else {
482        Decision::Keep
483    }
484}
485
486fn balance_table_options() -> typed_store::rocks::DBOptions {
487    default_table_options()
488        .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
489        .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
490}
491
492// Bitmap inverted index: merge operator, compaction filter, options.
493
494fn decode_bitmap_blob(bcs_bytes: &[u8]) -> Result<RoaringBitmap, anyhow::Error> {
495    let blob: BitmapBlob = bcs::from_bytes(bcs_bytes)?;
496    Ok(RoaringBitmap::deserialize_from(&blob.0[..])?)
497}
498
499fn encode_bitmap_blob(bm: &RoaringBitmap) -> Vec<u8> {
500    let mut buf = Vec::with_capacity(bm.serialized_size());
501    bm.serialize_into(&mut buf)
502        .expect("RoaringBitmap::serialize_into on Vec cannot fail");
503    bcs::to_bytes(&BitmapBlob(buf)).expect("BCS encode of BitmapBlob cannot fail")
504}
505
506/// RocksDB merge operator for both bitmap CFs. ORs all operands (and any
507/// existing on-disk value) into a single bitmap.
508fn bitmap_union_merge_operator(
509    _key: &[u8],
510    existing_val: Option<&[u8]>,
511    operands: &MergeOperands,
512) -> Option<Vec<u8>> {
513    let mut acc = match existing_val {
514        Some(v) => match decode_bitmap_blob(v) {
515            Ok(bm) => bm,
516            Err(e) => {
517                tracing::error!(
518                    "Failed to deserialize existing BitmapBlob during merge - data corruption: {e}"
519                );
520                return None;
521            }
522        },
523        None => RoaringBitmap::new(),
524    };
525
526    for operand in operands.iter() {
527        match decode_bitmap_blob(operand) {
528            Ok(bm) => acc |= bm,
529            Err(e) => {
530                tracing::error!(
531                    "Failed to deserialize BitmapBlob operand during merge - data corruption: {e}"
532                );
533                return None;
534            }
535        }
536    }
537
538    // Convert dense containers to run containers before serializing the
539    // accumulated bitmap. This is the on-disk representation (the merge
540    // operator's output is what RocksDB stores), and a bucket that matches
541    // many consecutive tx_seqs compresses substantially as runs. Mirrors the
542    // BigTable bitmap committer, which optimizes each row before writing.
543    // Operands are not optimized — they carry a bit or a handful, so there is
544    // nothing for run-encoding to collapse.
545    acc.optimize();
546    Some(encode_bitmap_blob(&acc))
547}
548
549/// Whole-bucket compaction filter for bitmap CFs. Reads the trailing 8 bytes
550/// of a typed-store key as `bucket_id` (bincode big-endian fixed-int), then
551/// removes the row iff the bucket is entirely below the current
552/// `tx_seq_pruning_watermark` exclusive value.
553///
554/// Never `Remove` on a parse failure: silent data loss is worse than a stuck
555/// row. The bucket math is `checked_*` because `bucket_id` is untrusted input
556/// from rocksdb — a corrupted key shouldn't be able to panic the compaction
557/// thread.
558#[derive(Clone)]
559pub struct BitmapCompactionFilter {
560    pruning_tx_seq_exclusive: Arc<AtomicU64>,
561    kind: BitmapKind,
562}
563
564impl BitmapCompactionFilter {
565    pub fn new(pruning_tx_seq_exclusive: Arc<AtomicU64>, kind: BitmapKind) -> Self {
566        Self {
567            pruning_tx_seq_exclusive,
568            kind,
569        }
570    }
571
572    pub fn filter(&self, key: &[u8], _value: &[u8]) -> Decision {
573        if key.len() < 8 {
574            warn!(
575                kind = ?self.kind,
576                "bitmap compaction filter saw key shorter than 8 bytes ({}); keeping",
577                key.len(),
578            );
579            return Decision::Keep;
580        }
581        let bucket_id =
582            u64::from_be_bytes(key[key.len() - 8..].try_into().expect("len checked above"));
583        let pruned_exclusive = self.pruning_tx_seq_exclusive.load(Ordering::Relaxed);
584
585        if self.kind.bucket_fully_pruned(bucket_id, pruned_exclusive) {
586            Decision::Remove
587        } else {
588            Decision::Keep
589        }
590    }
591}
592
593/// Default bitmap CF options. The merge operator must be present on every open;
594/// `bitmap_cf_options` adds the runtime compaction filter.
595fn bitmap_cf_default_options() -> typed_store::rocks::DBOptions {
596    default_table_options()
597        .set_merge_operator_associative("bitmap_union_merge", bitmap_union_merge_operator)
598}
599
600/// Bitmap CF options with the per-CF compaction filter attached.
601fn bitmap_cf_options(
602    filter_name: &str,
603    filter: BitmapCompactionFilter,
604) -> typed_store::rocks::DBOptions {
605    let mut options = bitmap_cf_default_options();
606    options
607        .options
608        .set_compaction_filter(filter_name, move |_level, key, value| {
609            filter.filter(key, value)
610        });
611    options
612}
613
614impl CoinIndexInfo {
615    fn merge(&mut self, other: Self) {
616        self.coin_metadata_object_id = self
617            .coin_metadata_object_id
618            .or(other.coin_metadata_object_id);
619        self.regulated_coin_metadata_object_id = self
620            .regulated_coin_metadata_object_id
621            .or(other.regulated_coin_metadata_object_id);
622        self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
623    }
624}
625
626/// RocksDB tables for the RpcIndexStore
627///
628/// Anytime a new table is added, or and existing one has it's schema changed, make sure to also
629/// update the value of `CURRENT_DB_VERSION`.
630///
631/// NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
632/// - bounded in size by the live object set
633/// - are prune-able and have corresponding logic in the `prune` function
634#[derive(DBMapUtils)]
635struct IndexStoreTables {
636    /// A singleton that store metadata information on the DB.
637    ///
638    /// A few uses for this singleton:
639    /// - determining if the DB has been initialized (as some tables will still be empty post
640    ///   initialization)
641    /// - version of the DB. Everytime a new table or schema is changed the version number needs to
642    ///   be incremented.
643    meta: DBMap<(), MetadataInfo>,
644
645    /// A singleton recording which optional features this DB was built with
646    /// (currently just ledger-history indexing). Kept separate from `meta` so
647    /// the schema `version` need not encode feature flags. The value is
648    /// `IndexSettings` as JSON bytes (see `IndexSettings`); auto-created on older
649    /// DBs, and a missing entry reads as `IndexSettings::default()` (all features
650    /// off).
651    settings: DBMap<(), Vec<u8>>,
652
653    /// Table used to track watermark for the highest indexed checkpoint
654    ///
655    /// This is useful to help know the highest checkpoint that was indexed in the event that the
656    /// node was running with indexes enabled, then run for a period of time with indexes disabled,
657    /// and then run with them enabled again so that the tables can be reinitialized.
658    #[default_options_override_fn = "default_table_options"]
659    watermark: DBMap<Watermark, CheckpointSequenceNumber>,
660
661    /// An index of extra metadata for Epochs.
662    ///
663    /// Only contains entries for transactions which have yet to be pruned from the main database.
664    #[default_options_override_fn = "default_table_options"]
665    epochs: DBMap<EpochId, EpochInfo>,
666
667    /// An index of extra metadata for Transactions.
668    ///
669    /// Only contains entries for transactions which have yet to be pruned from the main database.
670    #[default_options_override_fn = "default_table_options"]
671    #[allow(unused)]
672    #[deprecated]
673    transactions: DBMap<TransactionDigest, TransactionInfo>,
674
675    /// An index of object ownership.
676    ///
677    /// Allows an efficient iterator to list all objects currently owned by a specific user
678    /// account.
679    #[default_options_override_fn = "default_table_options"]
680    owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
681
682    /// An index of dynamic fields (children objects).
683    ///
684    /// Allows an efficient iterator to list all of the dynamic fields owned by a particular
685    /// ObjectID.
686    #[default_options_override_fn = "default_table_options"]
687    dynamic_field: DBMap<DynamicFieldKey, ()>,
688
689    /// An index of Coin Types
690    ///
691    /// Allows looking up information related to published Coins, like the ObjectID of its
692    /// coorisponding CoinMetadata.
693    #[default_options_override_fn = "default_table_options"]
694    coin: DBMap<CoinIndexKey, CoinIndexInfo>,
695
696    /// An index of Balances.
697    ///
698    /// Allows looking up balances by owner address and coin type.
699    #[default_options_override_fn = "balance_table_options"]
700    balance: DBMap<BalanceKey, BalanceIndexInfo>,
701
702    /// An index of Package versions.
703    ///
704    /// Maps original package ID and version to the storage ID of that version.
705    /// Allows efficient listing of all versions of a package.
706    #[default_options_override_fn = "default_table_options"]
707    package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
708
709    /// `tx_sequence_number` → (digest, event_count, checkpoint_number).
710    #[default_options_override_fn = "tx_seq_digest_table_options"]
711    tx_seq_digest: DBMap<u64, TxSeqDigestInfo>,
712
713    /// Transaction bitmap index keyed by `(dimension_key, tx_seq bucket)`.
714    #[default_options_override_fn = "bitmap_cf_default_options"]
715    transaction_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
716
717    /// Event bitmap index keyed by `(dimension_key, packed event_seq bucket)`.
718    #[default_options_override_fn = "bitmap_cf_default_options"]
719    event_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
720    // NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
721    // - bounded in size by the live object set
722    // - are prune-able and have corresponding logic in the `prune` function
723}
724
725/// Completely empty a column family and physically reclaim its disk. A single
726/// range tombstone over `[first, last)` (end-exclusive, so paired with a point
727/// delete of `last`) drops every row in O(1). The range is then compacted so
728/// the data is physically removed: this reclaims the space immediately rather
729/// than waiting for a background compaction, and makes the rows invisible to
730/// reads regardless of the CF's `ignore_range_deletions` setting.
731fn clear_table<K, V>(map: &DBMap<K, V>) -> Result<(), TypedStoreError>
732where
733    K: Serialize + DeserializeOwned,
734    V: Serialize + DeserializeOwned,
735{
736    let first = map.safe_iter().next().transpose()?.map(|(k, _)| k);
737    let last = map
738        .reversed_safe_iter_with_bounds(None, None)?
739        .next()
740        .transpose()?
741        .map(|(k, _)| k);
742    let (Some(first), Some(last)) = (first, last) else {
743        return Ok(());
744    };
745    let mut batch = map.batch();
746    batch.schedule_delete_range(map, &first, &last)?;
747    batch.delete_batch(map, std::iter::once(&last))?;
748    batch.write()?;
749    map.compact_range(&first, &last)
750}
751
752impl IndexStoreTables {
753    fn extract_version_if_package(
754        object: &Object,
755    ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
756        if let Data::Package(package) = &object.data {
757            let original_id = package.original_package_id();
758            let version = package.version().value();
759            let storage_id = object.id();
760
761            let key = PackageVersionKey {
762                original_package_id: original_id,
763                version,
764            };
765            let info = PackageVersionInfo { storage_id };
766            return Some((key, info));
767        }
768        None
769    }
770
771    fn open_with_index_options<P: Into<PathBuf>>(
772        path: P,
773        index_options: IndexStoreOptions,
774    ) -> Self {
775        // The typed-store derive macro only honors the per-field
776        // `default_options_override_fn` when `tables_db_options_override` is
777        // `None`. As soon as we pass `Some(map)`, any CF missing from the map
778        // silently falls back to bare `default_db_options()` — losing the
779        // `disable_write_throttling` applied by `default_table_options`. To
780        // avoid that, populate the map with `default_table_options()` for every
781        // table before overriding the few that need bespoke configuration.
782        let mut table_options = std::collections::BTreeMap::new();
783        for (table_name, _) in IndexStoreTables::describe_tables() {
784            table_options.insert(table_name, default_table_options());
785        }
786        table_options.insert("balance".to_string(), balance_table_options());
787        // Range-delete pruning needs tombstones honored by reads immediately.
788        table_options.insert("tx_seq_digest".to_string(), tx_seq_digest_table_options());
789
790        let bitmap_filter_tx = BitmapCompactionFilter::new(
791            index_options.pruning_tx_seq_exclusive.clone(),
792            BitmapKind::Transaction,
793        );
794        let bitmap_filter_event = BitmapCompactionFilter::new(
795            index_options.pruning_tx_seq_exclusive.clone(),
796            BitmapKind::Event,
797        );
798        table_options.insert(
799            "transaction_bitmap".to_string(),
800            bitmap_cf_options("transaction_bitmap_filter", bitmap_filter_tx),
801        );
802        table_options.insert(
803            "event_bitmap".to_string(),
804            bitmap_cf_options("event_bitmap_filter", bitmap_filter_event),
805        );
806
807        IndexStoreTables::open_tables_read_write_with_deprecation_option(
808            path.into(),
809            MetricConf::new("rpc-index"),
810            None,
811            Some(DBMapTableConfigMap::new(table_options)),
812            true, // remove deprecated tables
813        )
814    }
815
816    fn open_with_options<P: Into<PathBuf>>(
817        path: P,
818        options: typed_store::rocksdb::Options,
819        table_options: Option<DBMapTableConfigMap>,
820    ) -> Self {
821        IndexStoreTables::open_tables_read_write_with_deprecation_option(
822            path.into(),
823            MetricConf::new("rpc-index"),
824            Some(options),
825            table_options,
826            true, // remove deprecated tables
827        )
828    }
829
830    /// Whether the ledger-history feature was enabled when this DB was built,
831    /// as persisted in the `settings` CF. A missing entry (fresh or pre-feature
832    /// DB) reads as `false`.
833    fn persisted_ledger_history_indexing(&self) -> bool {
834        self.read_settings().ledger_history_indexing
835    }
836
837    /// Read the persisted settings, defaulting when the row is absent or
838    /// unreadable.
839    fn read_settings(&self) -> IndexSettings {
840        self.settings
841            .get(&())
842            .ok()
843            .flatten()
844            .map(|bytes| decode_settings(&bytes))
845            .unwrap_or_default()
846    }
847
848    fn needs_to_do_initialization(
849        &self,
850        checkpoint_store: &CheckpointStore,
851        ledger_history_indexing: bool,
852    ) -> bool {
853        let schema_stale = match self.meta.get(&()) {
854            Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
855            Ok(None) | Err(_) => true,
856        };
857        // *Enabling* ledger history requires a full rebuild to backfill the
858        // historical rows. *Disabling* it does not — the base indexes stay
859        // valid, so the now-unused history CFs are dropped in place by
860        // `disable_ledger_history_indexing` instead.
861        let enabling = ledger_history_indexing && !self.persisted_ledger_history_indexing();
862        schema_stale || enabling || self.is_indexed_watermark_out_of_date(checkpoint_store)
863    }
864
865    /// Drop the ledger-history column families and clear the persisted feature
866    /// flag without rebuilding the rest of the index. Used when a node that had
867    /// ledger history enabled restarts with it disabled: the base indexes are
868    /// untouched, so only the now-unused history rows need to go. Idempotent —
869    /// it runs at open before the store goes live, so a crash mid-way just
870    /// re-runs it on the next start.
871    fn disable_ledger_history_indexing(&self) -> Result<(), TypedStoreError> {
872        clear_table(&self.tx_seq_digest)?;
873        clear_table(&self.transaction_bitmap)?;
874        clear_table(&self.event_bitmap)?;
875        self.settings.insert(
876            &(),
877            &encode_settings(&IndexSettings {
878                ledger_history_indexing: false,
879            }),
880        )?;
881        Ok(())
882    }
883
884    // Check if the index watermark is behind the highets_executed watermark.
885    fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
886        let highest_executed_checkpint = checkpoint_store
887            .get_highest_executed_checkpoint_seq_number()
888            .ok()
889            .flatten();
890        let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
891        watermark < highest_executed_checkpint
892    }
893
894    #[tracing::instrument(skip_all)]
895    fn init(
896        &mut self,
897        authority_store: &AuthorityStore,
898        checkpoint_store: &CheckpointStore,
899        _epoch_store: &AuthorityPerEpochStore,
900        _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
901        batch_size_limit: usize,
902        rpc_config: &sui_config::RpcConfig,
903    ) -> Result<(), StorageError> {
904        info!("Initializing RPC indexes");
905
906        let highest_executed_checkpint =
907            checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
908        let lowest_available_checkpoint = checkpoint_store
909            .get_highest_pruned_checkpoint_seq_number()?
910            .map(|c| c.saturating_add(1))
911            .unwrap_or(0);
912        let lowest_available_checkpoint_objects = authority_store
913            .perpetual_tables
914            .get_highest_pruned_checkpoint()?
915            .map(|c| c.saturating_add(1))
916            .unwrap_or(0);
917        // Doing backfill requires processing objects so we have to restrict our backfill range
918        // to the range of checkpoints that we have objects for.
919        let lowest_available_checkpoint =
920            lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
921
922        let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
923            lowest_available_checkpoint..=highest_executed_checkpint
924        });
925
926        if let Some(checkpoint_range) = checkpoint_range.clone() {
927            self.index_existing_checkpoints(authority_store, checkpoint_store, checkpoint_range)?;
928        }
929
930        if rpc_config.ledger_history_indexing()
931            && let Some(checkpoint_range) = checkpoint_range
932        {
933            self.backfill_ledger_history_indexes(
934                authority_store,
935                checkpoint_store,
936                checkpoint_range,
937            )?;
938        }
939
940        self.initialize_current_epoch(authority_store, checkpoint_store)?;
941
942        // Only index live objects if genesis checkpoint has been executed.
943        // If genesis hasn't been executed yet, the objects will be properly indexed
944        // as checkpoints are processed through the normal checkpoint execution path.
945        if highest_executed_checkpint.is_some() {
946            let coin_index = Mutex::new(HashMap::new());
947
948            let make_live_object_indexer = RpcParLiveObjectSetIndexer {
949                tables: self,
950                coin_index: &coin_index,
951                batch_size_limit,
952            };
953
954            crate::par_index_live_object_set::par_index_live_object_set(
955                authority_store,
956                &make_live_object_indexer,
957            )?;
958
959            self.coin.multi_insert(coin_index.into_inner().unwrap())?;
960        }
961
962        self.watermark.insert(
963            &Watermark::Indexed,
964            &highest_executed_checkpint.unwrap_or(0),
965        )?;
966
967        // Write the schema version and the feature settings in one batch so the
968        // two can never be persisted independently.
969        let mut batch = self.meta.batch();
970        batch.insert_batch(
971            &self.meta,
972            [(
973                (),
974                MetadataInfo {
975                    version: CURRENT_DB_VERSION,
976                },
977            )],
978        )?;
979        batch.insert_batch(
980            &self.settings,
981            [(
982                (),
983                encode_settings(&IndexSettings {
984                    ledger_history_indexing: rpc_config.ledger_history_indexing(),
985                }),
986            )],
987        )?;
988        batch.write()?;
989
990        info!("Finished initializing RPC indexes");
991
992        Ok(())
993    }
994
995    #[tracing::instrument(skip(self, authority_store, checkpoint_store))]
996    fn index_existing_checkpoints(
997        &mut self,
998        authority_store: &AuthorityStore,
999        checkpoint_store: &CheckpointStore,
1000        checkpoint_range: std::ops::RangeInclusive<u64>,
1001    ) -> Result<(), StorageError> {
1002        info!(
1003            "Indexing {} checkpoints in range {checkpoint_range:?}",
1004            checkpoint_range.size_hint().0
1005        );
1006        let start_time = Instant::now();
1007
1008        checkpoint_range.into_par_iter().try_for_each(|seq| {
1009            let Some(checkpoint) =
1010                sparse_checkpoint_for_epoch_backfill(authority_store, checkpoint_store, seq)?
1011            else {
1012                return Ok(());
1013            };
1014
1015            let mut batch = self.epochs.batch();
1016
1017            self.index_epoch(&checkpoint, &mut batch)?;
1018
1019            batch
1020                .write_opt(bulk_ingestion_write_options())
1021                .map_err(StorageError::from)
1022        })?;
1023
1024        info!(
1025            "Indexing checkpoints took {} seconds",
1026            start_time.elapsed().as_secs()
1027        );
1028        Ok(())
1029    }
1030
1031    /// Backfill ledger history rows over a freshly recreated rpc-index DB.
1032    ///
1033    /// Bulk writes disable WAL, so this flushes before `init()` writes the
1034    /// `meta.version` / `settings` markers; otherwise a crash could persist
1035    /// those markers without the rows they claim to cover.
1036    fn backfill_ledger_history_indexes(
1037        &self,
1038        authority_store: &AuthorityStore,
1039        checkpoint_store: &CheckpointStore,
1040        checkpoint_range: std::ops::RangeInclusive<u64>,
1041    ) -> Result<(), StorageError> {
1042        info!("ledger history backfill: cps {checkpoint_range:?}");
1043        let start_time = Instant::now();
1044
1045        checkpoint_range.clone().into_par_iter().try_for_each(
1046            |seq| -> Result<(), StorageError> {
1047                let checkpoint =
1048                    full_checkpoint_for_backfill(authority_store, checkpoint_store, seq)?
1049                        .ok_or_else(|| {
1050                            // Missing retained data would leave a permanent hole.
1051                            StorageError::missing(format!(
1052                                "ledger history backfill: checkpoint {seq} is missing from local \
1053                                 storage but falls inside the retained backfill range \
1054                                 {checkpoint_range:?}"
1055                            ))
1056                        })?;
1057                let mut batch = self.meta.batch();
1058                self.write_ledger_history_rows_for_checkpoint(&checkpoint, &mut batch)?;
1059                batch
1060                    .write_opt(bulk_ingestion_write_options())
1061                    .map_err(StorageError::from)
1062            },
1063        )?;
1064
1065        // Flushing one CF flushes the whole shared RocksDB instance.
1066        self.tx_seq_digest.flush().map_err(|e| {
1067            StorageError::custom(format!("flush after ledger history backfill: {e}"))
1068        })?;
1069
1070        info!(
1071            "ledger history backfill took {} seconds",
1072            start_time.elapsed().as_secs()
1073        );
1074        Ok(())
1075    }
1076
1077    /// The lowest live key of `tx_seq_digest`: the ledger history pruning
1078    /// floor in tx-seq space. `prune()` maintains the invariant that this
1079    /// equals the highest fully-pruned tx_seq (exclusive): pruning
1080    /// point-deletes `tx_seq_digest` rows below the floor, and forward
1081    /// indexing only adds rows above it. Returns `None` when the CF is empty
1082    /// (nothing indexed yet), which callers treat as floor 0.
1083    fn first_tx_seq_digest_key(&self) -> Result<Option<u64>, TypedStoreError> {
1084        match self.tx_seq_digest.safe_iter().next() {
1085            Some(Ok((k, _))) => Ok(Some(k)),
1086            Some(Err(e)) => Err(e),
1087            None => Ok(None),
1088        }
1089    }
1090
1091    /// Prune data from this Index. `pruned_tx_seq_exclusive` is the absolute
1092    /// tx-seq floor after this prune — the caller derives it from the
1093    /// last-pruned checkpoint's `network_total_transactions`.
1094    ///
1095    /// When ledger history is enabled, the compaction-filter atomic is moved to
1096    /// the new floor *after* the batch commits, so the atomic never leads disk.
1097    fn prune(
1098        &self,
1099        pruned_checkpoint_watermark: u64,
1100        pruned_tx_seq_exclusive: u64,
1101        ledger_history_enabled: bool,
1102        pruning_atomic: &AtomicU64,
1103    ) -> Result<(), TypedStoreError> {
1104        let mut batch = self.watermark.batch();
1105
1106        batch.insert_batch(
1107            &self.watermark,
1108            [(Watermark::Pruned, pruned_checkpoint_watermark)],
1109        )?;
1110
1111        if ledger_history_enabled {
1112            // First live `tx_seq_digest` key = current floor, read from disk so a
1113            // prune skipped by a crash self-heals on the next call. The range
1114            // delete shares the batch with the `Watermark::Pruned` insert above.
1115            let prev_exclusive = self.first_tx_seq_digest_key()?.unwrap_or(0);
1116            batch.schedule_delete_range(
1117                &self.tx_seq_digest,
1118                &prev_exclusive,
1119                &pruned_tx_seq_exclusive,
1120            )?;
1121        }
1122
1123        batch.write()?;
1124
1125        if ledger_history_enabled {
1126            // After the commit so the atomic never leads disk.
1127            pruning_atomic.store(pruned_tx_seq_exclusive, Ordering::Relaxed);
1128        }
1129        Ok(())
1130    }
1131
1132    /// Index a Checkpoint
1133    fn index_checkpoint(
1134        &self,
1135        checkpoint: &Checkpoint,
1136        ledger_history_enabled: bool,
1137    ) -> Result<typed_store::rocks::DBBatch, StorageError> {
1138        debug!(
1139            checkpoint = checkpoint.summary.sequence_number,
1140            "indexing checkpoint"
1141        );
1142
1143        let mut batch = self.owner.batch();
1144
1145        self.index_epoch(checkpoint, &mut batch)?;
1146        self.index_transactions(checkpoint, &mut batch)?;
1147        self.index_objects(checkpoint, &mut batch)?;
1148
1149        // Ledger history rows ride the same batch as `Watermark::Indexed`.
1150        if ledger_history_enabled {
1151            self.write_ledger_history_rows_for_checkpoint(checkpoint, &mut batch)?;
1152        }
1153
1154        batch.insert_batch(
1155            &self.watermark,
1156            [(Watermark::Indexed, checkpoint.summary.sequence_number)],
1157        )?;
1158
1159        debug!(
1160            checkpoint = checkpoint.summary.sequence_number,
1161            "finished indexing checkpoint"
1162        );
1163
1164        Ok(batch)
1165    }
1166
1167    fn index_epoch(
1168        &self,
1169        checkpoint: &Checkpoint,
1170        batch: &mut typed_store::rocks::DBBatch,
1171    ) -> Result<(), StorageError> {
1172        let Some(epoch_info) = checkpoint.epoch_info()? else {
1173            return Ok(());
1174        };
1175        if epoch_info.epoch > 0 {
1176            let prev_epoch = epoch_info.epoch - 1;
1177            let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
1178            current_epoch.epoch = prev_epoch; // set this incase there wasn't an entry
1179            current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
1180            current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
1181            batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
1182        }
1183        batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
1184        Ok(())
1185    }
1186
1187    // After attempting to reindex past epochs, ensure that the current epoch is at least partially
1188    // initalized
1189    fn initialize_current_epoch(
1190        &mut self,
1191        authority_store: &AuthorityStore,
1192        checkpoint_store: &CheckpointStore,
1193    ) -> Result<(), StorageError> {
1194        let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
1195            return Ok(());
1196        };
1197
1198        let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
1199            .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
1200
1201        let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
1202        epoch.epoch = checkpoint.epoch;
1203
1204        if epoch.protocol_version.is_none() {
1205            epoch.protocol_version = Some(system_state.protocol_version());
1206        }
1207
1208        if epoch.start_timestamp_ms.is_none() {
1209            epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
1210        }
1211
1212        if epoch.reference_gas_price.is_none() {
1213            epoch.reference_gas_price = Some(system_state.reference_gas_price());
1214        }
1215
1216        if epoch.system_state.is_none() {
1217            epoch.system_state = Some(system_state);
1218        }
1219
1220        self.epochs.insert(&epoch.epoch, &epoch)?;
1221
1222        Ok(())
1223    }
1224
1225    fn index_transactions(
1226        &self,
1227        checkpoint: &Checkpoint,
1228        batch: &mut typed_store::rocks::DBBatch,
1229    ) -> Result<(), StorageError> {
1230        for tx in &checkpoint.transactions {
1231            let balance_changes = sui_types::balance_change::derive_detailed_balance_changes_2(
1232                &tx.effects,
1233                &checkpoint.object_set,
1234            )
1235            .into_iter()
1236            .filter_map(|change| {
1237                if let TypeTag::Struct(coin_type) = change.coin_type {
1238                    Some((
1239                        BalanceKey {
1240                            owner: change.address,
1241                            coin_type: *coin_type,
1242                        },
1243                        BalanceIndexInfo {
1244                            coin_balance_delta: change.coin_amount,
1245                            address_balance_delta: change.address_amount,
1246                        },
1247                    ))
1248                } else {
1249                    None
1250                }
1251            });
1252            batch.partial_merge_batch(&self.balance, balance_changes)?;
1253        }
1254
1255        Ok(())
1256    }
1257
1258    /// Emit `tx_seq_digest` rows and bitmap merge operands for every tx in
1259    /// `checkpoint`. Shared by forward indexing (`index_checkpoint`) and the
1260    /// rebuild-time ledger history backfill. There is no separate watermark:
1261    /// `Watermark::Indexed` is the source of truth for coverage.
1262    fn write_ledger_history_rows_for_checkpoint(
1263        &self,
1264        checkpoint: &Checkpoint,
1265        batch: &mut typed_store::rocks::DBBatch,
1266    ) -> Result<(), StorageError> {
1267        let cp_seq = checkpoint.summary.sequence_number;
1268        let net_total = checkpoint.summary.data().network_total_transactions;
1269        let tx_count = checkpoint.transactions.len() as u64;
1270        // `network_total_transactions` is cumulative *including* this cp.
1271        // checked_sub: if the cp's network_total_transactions is somehow
1272        // less than its own tx count, surface an error rather than wrap.
1273        let tx_lo = net_total.checked_sub(tx_count).ok_or_else(|| {
1274            StorageError::custom(format!(
1275                "checkpoint {cp_seq}: network_total_transactions ({net_total}) \
1276                 < tx_count ({tx_count})"
1277            ))
1278        })?;
1279
1280        let object_set = &checkpoint.object_set;
1281
1282        // Group tx-space bitmap bits across the whole checkpoint so repeated
1283        // dimensions in the same tx bucket produce one Rocks merge operand.
1284        let mut tx_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1285
1286        for (i, tx) in checkpoint.transactions.iter().enumerate() {
1287            let tx_seq = tx_lo + i as u64;
1288
1289            let tx_data = &tx.transaction;
1290            let digest = *tx.effects.transaction_digest();
1291            let event_count = tx.events.as_ref().map(|e| e.data.len() as u32).unwrap_or(0);
1292
1293            // tx_seq_digest: one direct row per tx, no merge needed.
1294            batch.insert_batch(
1295                &self.tx_seq_digest,
1296                [(
1297                    tx_seq,
1298                    TxSeqDigestInfo {
1299                        digest,
1300                        event_count,
1301                        // `i` is the transaction's zero-based position within this checkpoint.
1302                        tx_offset: i as u32,
1303                        checkpoint_number: cp_seq,
1304                    },
1305                )],
1306            )?;
1307
1308            // Tx-space bitmap: dedup dimension_keys within this tx, then add
1309            // this tx's bit to the checkpoint-scoped bitmap group.
1310            let tx_bucket = tx_seq / TX_BUCKET_SIZE;
1311            let tx_bit = (tx_seq % TX_BUCKET_SIZE) as u32;
1312            let mut tx_dim_keys: FxHashSet<Vec<u8>> = FxHashSet::default();
1313            for_each_transaction_dimension(
1314                tx_data,
1315                &tx.effects,
1316                tx.events.as_ref(),
1317                object_set,
1318                |dim, value| {
1319                    tx_dim_keys.insert(encode_dimension_key(dim, value));
1320                },
1321            );
1322            for dim_key in tx_dim_keys {
1323                tx_groups
1324                    .entry((dim_key, tx_bucket))
1325                    .or_default()
1326                    .insert(tx_bit);
1327            }
1328
1329            // Event-space bitmap: bits from multiple events of the same tx
1330            // can share a (dim_key, bucket); group into a RoaringBitmap so
1331            // we emit at most one operand per group.
1332            let mut event_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1333            let mut event_seq_error = None;
1334            for_each_event_dimension(
1335                tx_data.sender(),
1336                &tx.effects,
1337                tx.events.as_ref(),
1338                |event_idx, dim, value| {
1339                    let event_seq = match checked_encode_event_seq(tx_seq, event_idx) {
1340                        Ok(event_seq) => event_seq,
1341                        Err(e) => {
1342                            event_seq_error.get_or_insert(e);
1343                            return;
1344                        }
1345                    };
1346                    let bucket = event_seq / EVENT_BUCKET_SIZE;
1347                    let bit = (event_seq % EVENT_BUCKET_SIZE) as u32;
1348                    event_groups
1349                        .entry((encode_dimension_key(dim, value), bucket))
1350                        .or_default()
1351                        .insert(bit);
1352                },
1353            );
1354            if let Some(e) = event_seq_error {
1355                return Err(e);
1356            }
1357            let event_ops = event_groups.into_iter().map(|((dim_key, bucket), bm)| {
1358                (
1359                    BitmapIndexKey {
1360                        dimension_key: dim_key,
1361                        bucket_id: bucket,
1362                    },
1363                    BitmapBlob::from(bm),
1364                )
1365            });
1366            batch.partial_merge_batch(&self.event_bitmap, event_ops)?;
1367        }
1368
1369        let tx_ops = tx_groups.into_iter().map(|((dim_key, bucket), bm)| {
1370            (
1371                BitmapIndexKey {
1372                    dimension_key: dim_key,
1373                    bucket_id: bucket,
1374                },
1375                BitmapBlob::from(bm),
1376            )
1377        });
1378        batch.partial_merge_batch(&self.transaction_bitmap, tx_ops)?;
1379
1380        Ok(())
1381    }
1382
1383    fn index_objects(
1384        &self,
1385        checkpoint: &Checkpoint,
1386        batch: &mut typed_store::rocks::DBBatch,
1387    ) -> Result<(), StorageError> {
1388        let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
1389        let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
1390        let object_set = &checkpoint.object_set;
1391
1392        for tx in &checkpoint.transactions {
1393            // determine changes from removed objects
1394            for removed_object in tx_removed_objects_pre_version(tx, object_set) {
1395                match removed_object.owner() {
1396                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1397                        let owner_key = OwnerIndexKey::from_object(removed_object);
1398                        batch.delete_batch(&self.owner, [owner_key])?;
1399                    }
1400                    Owner::ObjectOwner(object_id) => {
1401                        batch.delete_batch(
1402                            &self.dynamic_field,
1403                            [DynamicFieldKey::new(*object_id, removed_object.id())],
1404                        )?;
1405                    }
1406                    Owner::Shared { .. } | Owner::Immutable => {}
1407                    Owner::Party { .. } => {
1408                        // TODO(Party WIP)
1409                        todo!("Party WIP");
1410                        // We could maybe look at non-default permissions for the owner. But
1411                        // I'm not sure what this is really used for
1412                    }
1413                }
1414            }
1415
1416            // determine changes from changed objects
1417            for (object, old_object) in tx_changed_objects(tx, object_set) {
1418                if let Some(old_object) = old_object {
1419                    match old_object.owner() {
1420                        Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1421                            let owner_key = OwnerIndexKey::from_object(old_object);
1422                            batch.delete_batch(&self.owner, [owner_key])?;
1423                        }
1424
1425                        Owner::ObjectOwner(object_id) => {
1426                            if old_object.owner() != object.owner() {
1427                                batch.delete_batch(
1428                                    &self.dynamic_field,
1429                                    [DynamicFieldKey::new(*object_id, old_object.id())],
1430                                )?;
1431                            }
1432                        }
1433
1434                        Owner::Shared { .. } | Owner::Immutable => {}
1435
1436                        Owner::Party { .. } => {
1437                            // TODO(Party WIP)
1438                            todo!("Party WIP");
1439                            // We could maybe look at non-default permissions for the owner. But
1440                            // I'm not sure what this is really used for
1441                        }
1442                    }
1443                }
1444
1445                match object.owner() {
1446                    Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1447                        let owner_key = OwnerIndexKey::from_object(object);
1448                        let owner_info = OwnerIndexInfo::new(object);
1449                        batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
1450                    }
1451                    Owner::ObjectOwner(parent) => {
1452                        if should_index_dynamic_field(object) {
1453                            let field_key = DynamicFieldKey::new(*parent, object.id());
1454                            batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
1455                        }
1456                    }
1457                    Owner::Shared { .. } | Owner::Immutable => {}
1458                    // TODO(Party WIP)
1459                    Owner::Party { .. } => todo!("Party WIP"),
1460                }
1461                if let Some((key, info)) = Self::extract_version_if_package(object) {
1462                    package_version_index.push((key, info));
1463                }
1464            }
1465
1466            // coin indexing
1467            //
1468            // coin indexing relies on the fact that CoinMetadata and TreasuryCap are created in
1469            // the same transaction so we don't need to worry about overriding any older value
1470            // that may exist in the database (because there necessarily cannot be).
1471            for (key, value) in tx
1472                .created_objects(object_set)
1473                .flat_map(try_create_coin_index_info)
1474            {
1475                use std::collections::hash_map::Entry;
1476
1477                match coin_index.entry(key) {
1478                    Entry::Occupied(mut o) => {
1479                        o.get_mut().merge(value);
1480                    }
1481                    Entry::Vacant(v) => {
1482                        v.insert(value);
1483                    }
1484                }
1485            }
1486        }
1487
1488        batch.insert_batch(&self.coin, coin_index)?;
1489        batch.insert_batch(&self.package_version, package_version_index)?;
1490
1491        Ok(())
1492    }
1493
1494    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1495        self.epochs.get(&epoch)
1496    }
1497
1498    fn owner_iter(
1499        &self,
1500        owner: SuiAddress,
1501        object_type: Option<StructTag>,
1502        cursor: Option<OwnerIndexKey>,
1503    ) -> Result<
1504        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1505        TypedStoreError,
1506    > {
1507        // TODO can we figure out how to pass a raw byte array as a cursor?
1508        let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1509            owner,
1510            object_type: object_type
1511                .clone()
1512                .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1513            inverted_balance: None,
1514            object_id: ObjectID::ZERO,
1515        });
1516
1517        Ok(self
1518            .owner
1519            .safe_iter_with_bounds(Some(lower_bound), None)
1520            .take_while(move |item| {
1521                // If there's an error let if flow through
1522                let Ok((key, _)) = item else {
1523                    return true;
1524                };
1525
1526                // Only take if owner matches
1527                key.owner == owner
1528                    // and if an object type was supplied that the type matches
1529                    && object_type
1530                        .as_ref()
1531                        .map(|ty| {
1532                            ty.address == key.object_type.address
1533                                && ty.module == key.object_type.module
1534                                && ty.name == key.object_type.name
1535                                // If type_params are not provided then we match all params
1536                                && (ty.type_params.is_empty() ||
1537                                    // If they are provided the type params must match
1538                                    ty.type_params == key.object_type.type_params)
1539                        }).unwrap_or(true)
1540            }))
1541    }
1542
1543    fn dynamic_field_iter(
1544        &self,
1545        parent: ObjectID,
1546        cursor: Option<ObjectID>,
1547    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1548    {
1549        let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1550        let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1551        let iter = self
1552            .dynamic_field
1553            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1554            .map_ok(|(key, ())| key);
1555        Ok(iter)
1556    }
1557
1558    fn get_coin_info(
1559        &self,
1560        coin_type: &StructTag,
1561    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1562        let key = CoinIndexKey {
1563            coin_type: coin_type.to_owned(),
1564        };
1565        self.coin.get(&key)
1566    }
1567
1568    fn get_balance(
1569        &self,
1570        owner: &SuiAddress,
1571        coin_type: &StructTag,
1572    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1573        let key = BalanceKey {
1574            owner: owner.to_owned(),
1575            coin_type: coin_type.to_owned(),
1576        };
1577        self.balance.get(&key)
1578    }
1579
1580    fn balance_iter(
1581        &self,
1582        owner: SuiAddress,
1583        cursor: Option<BalanceKey>,
1584    ) -> Result<
1585        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1586        TypedStoreError,
1587    > {
1588        let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1589            owner,
1590            coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1591        });
1592
1593        Ok(self
1594            .balance
1595            .safe_iter_with_bounds(Some(lower_bound), None)
1596            .scan((), move |_, item| {
1597                match item {
1598                    Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1599                    Ok(_) => None,          // Different owner, stop iteration
1600                    Err(e) => Some(Err(e)), // Propagate error
1601                }
1602            }))
1603    }
1604
1605    fn package_versions_iter(
1606        &self,
1607        original_id: ObjectID,
1608        cursor: Option<u64>,
1609    ) -> Result<
1610        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1611        TypedStoreError,
1612    > {
1613        let lower_bound = PackageVersionKey {
1614            original_package_id: original_id,
1615            version: cursor.unwrap_or(0),
1616        };
1617        let upper_bound = PackageVersionKey {
1618            original_package_id: original_id,
1619            version: u64::MAX,
1620        };
1621
1622        Ok(self
1623            .package_version
1624            .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1625    }
1626}
1627
1628pub struct RpcIndexStore {
1629    tables: IndexStoreTables,
1630    pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1631    /// Shared with the bitmap compaction filters. Advanced by `prune()` after
1632    /// the corresponding watermark batch commits, so compactions never see a
1633    /// value that hasn't been persisted.
1634    ledger_history_pruning_watermark: Arc<AtomicU64>,
1635    /// True iff this rpc-index DB was built with ledger history indexing
1636    /// enabled. Derived once at open from the persisted `settings` CF and used
1637    /// as the gate for forward indexing and pruning.
1638    ledger_history_enabled: bool,
1639}
1640
1641impl RpcIndexStore {
1642    /// Given the provided directory, construct the path to the db
1643    fn db_path(dir: &Path) -> PathBuf {
1644        dir.join("rpc-index")
1645    }
1646
1647    pub async fn new(
1648        dir: &Path,
1649        authority_store: &AuthorityStore,
1650        checkpoint_store: &CheckpointStore,
1651        epoch_store: &AuthorityPerEpochStore,
1652        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1653        rpc_config: sui_config::RpcConfig,
1654    ) -> Self {
1655        // Internal-only tx-seq floor, hydrated from disk on open.
1656        let ledger_history_pruning_watermark = Arc::new(AtomicU64::new(0));
1657        let index_options = IndexStoreOptions {
1658            pruning_tx_seq_exclusive: ledger_history_pruning_watermark,
1659        };
1660
1661        Self::new_with_options(
1662            dir,
1663            authority_store,
1664            checkpoint_store,
1665            epoch_store,
1666            package_store,
1667            index_options,
1668            rpc_config,
1669        )
1670        .await
1671    }
1672
1673    pub async fn new_with_options(
1674        dir: &Path,
1675        authority_store: &AuthorityStore,
1676        checkpoint_store: &CheckpointStore,
1677        epoch_store: &AuthorityPerEpochStore,
1678        package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1679        index_options: IndexStoreOptions,
1680        rpc_config: sui_config::RpcConfig,
1681    ) -> Self {
1682        let path = Self::db_path(dir);
1683        let index_config = rpc_config.index_initialization_config();
1684
1685        let ledger_history_atomic = index_options.pruning_tx_seq_exclusive.clone();
1686
1687        let tables = {
1688            let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1689
1690            // Rebuild if the schema or watermarks are stale, or ledger history
1691            // is being enabled (which needs a backfill).
1692            if tables
1693                .needs_to_do_initialization(checkpoint_store, rpc_config.ledger_history_indexing())
1694            {
1695                let batch_size_limit;
1696
1697                let mut tables = {
1698                    drop(tables);
1699                    typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1700                        .await
1701                        .expect("unable to destroy old rpc-index db");
1702
1703                    // Open the empty DB with `unordered_write`s enabled in order to get a ~3x
1704                    // speedup when indexing
1705                    let mut options = typed_store::rocksdb::Options::default();
1706                    options.set_unordered_write(true);
1707
1708                    // Allow CPU-intensive flushing operations to use all CPUs.
1709                    let max_background_jobs = if let Some(jobs) =
1710                        index_config.as_ref().and_then(|c| c.max_background_jobs)
1711                    {
1712                        debug!("Using config override for max_background_jobs: {}", jobs);
1713                        jobs
1714                    } else {
1715                        let jobs = num_cpus::get() as i32;
1716                        debug!(
1717                            "Calculated max_background_jobs: {} (based on CPU count)",
1718                            jobs
1719                        );
1720                        jobs
1721                    };
1722                    options.set_max_background_jobs(max_background_jobs);
1723
1724                    // We are disabling compaction for all column families below. This means we can
1725                    // also disable the backpressure that slows down writes when the number of L0
1726                    // files builds up since we will never compact them anyway.
1727                    options.set_level_zero_file_num_compaction_trigger(0);
1728                    options.set_level_zero_slowdown_writes_trigger(-1);
1729                    options.set_level_zero_stop_writes_trigger(i32::MAX);
1730
1731                    let total_memory_bytes = get_available_memory();
1732                    // This is an upper bound on the amount to of ram the memtables can use across
1733                    // all column families.
1734                    let db_buffer_size = if let Some(size) =
1735                        index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1736                    {
1737                        debug!(
1738                            "Using config override for db_write_buffer_size: {} bytes",
1739                            size
1740                        );
1741                        size
1742                    } else {
1743                        // Default to 80% of system RAM
1744                        let size = (total_memory_bytes as f64 * 0.8) as usize;
1745                        debug!(
1746                            "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1747                            size, total_memory_bytes
1748                        );
1749                        size
1750                    };
1751                    options.set_db_write_buffer_size(db_buffer_size);
1752
1753                    // Create column family specific options.
1754                    let mut table_config_map = BTreeMap::new();
1755
1756                    // Create options with compactions disabled and large write buffers.
1757                    // Each CF can use up to 25% of system RAM, but total is still limited by
1758                    // set_db_write_buffer_size configured above.
1759                    let mut cf_options = typed_store::rocks::default_db_options();
1760                    cf_options.options.set_disable_auto_compactions(true);
1761
1762                    let (buffer_size, buffer_count) = match (
1763                        index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1764                        index_config
1765                            .as_ref()
1766                            .and_then(|c| c.cf_max_write_buffer_number),
1767                    ) {
1768                        (Some(size), Some(count)) => {
1769                            debug!(
1770                                "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1771                                size, count
1772                            );
1773                            (size, count)
1774                        }
1775                        (None, None) => {
1776                            // Calculate buffer configuration: 25% of RAM split across buffers
1777                            let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1778                            debug!(
1779                                "Column family memory budget: {} bytes (25% of {} total bytes)",
1780                                cf_memory_budget, total_memory_bytes
1781                            );
1782                            const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB minimum
1783
1784                            // Target number of buffers based on CPU count
1785                            // More CPUs = more parallel flushing capability
1786                            let target_buffer_count = num_cpus::get().max(2);
1787
1788                            // Aim for CPU-based buffer count, but reduce if it would make buffers too small
1789                            //   For example:
1790                            // - 128GB RAM, 32 CPUs: 32GB per CF / 32 buffers = 1GB each
1791                            // - 16GB RAM, 8 CPUs: 4GB per CF / 8 buffers = 512MB each
1792                            // - 4GB RAM, 8 CPUs: 1GB per CF / 64MB min = ~16 buffers of 64MB each
1793                            let buffer_size =
1794                                (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1795                            let buffer_count = (cf_memory_budget / buffer_size)
1796                                .clamp(2, target_buffer_count)
1797                                as i32;
1798                            debug!(
1799                                "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1800                                buffer_size, buffer_count, target_buffer_count
1801                            );
1802                            (buffer_size, buffer_count)
1803                        }
1804                        _ => {
1805                            panic!(
1806                                "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1807                            );
1808                        }
1809                    };
1810
1811                    cf_options.options.set_write_buffer_size(buffer_size);
1812                    cf_options.options.set_max_write_buffer_number(buffer_count);
1813
1814                    // Calculate batch size limit: default to half the buffer size or 128MB, whichever is smaller
1815                    batch_size_limit = if let Some(limit) =
1816                        index_config.as_ref().and_then(|c| c.batch_size_limit)
1817                    {
1818                        debug!(
1819                            "Using config override for batch_size_limit: {} bytes",
1820                            limit
1821                        );
1822                        limit
1823                    } else {
1824                        let half_buffer = buffer_size / 2;
1825                        let default_limit = 1 << 27; // 128MB
1826                        let limit = half_buffer.min(default_limit);
1827                        debug!(
1828                            "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1829                            limit, half_buffer, default_limit
1830                        );
1831                        limit
1832                    };
1833
1834                    // Apply cf_options to all tables
1835                    for (table_name, _) in IndexStoreTables::describe_tables() {
1836                        table_config_map.insert(table_name, cf_options.clone());
1837                    }
1838
1839                    // Override Balance options with the merge operator
1840                    let mut balance_options = cf_options.clone();
1841                    balance_options = balance_options.set_merge_operator_associative(
1842                        "balance_merge",
1843                        balance_delta_merge_operator,
1844                    );
1845                    table_config_map.insert("balance".to_string(), balance_options);
1846
1847                    let bitmap_filter_tx = BitmapCompactionFilter::new(
1848                        index_options.pruning_tx_seq_exclusive.clone(),
1849                        BitmapKind::Transaction,
1850                    );
1851                    let bitmap_filter_event = BitmapCompactionFilter::new(
1852                        index_options.pruning_tx_seq_exclusive.clone(),
1853                        BitmapKind::Event,
1854                    );
1855                    let mut transaction_bitmap_opts = cf_options.clone();
1856                    transaction_bitmap_opts = transaction_bitmap_opts
1857                        .set_merge_operator_associative(
1858                            "bitmap_union_merge",
1859                            bitmap_union_merge_operator,
1860                        );
1861                    transaction_bitmap_opts.options.set_compaction_filter(
1862                        "transaction_bitmap_filter",
1863                        move |_level, key, value| bitmap_filter_tx.filter(key, value),
1864                    );
1865                    table_config_map
1866                        .insert("transaction_bitmap".to_string(), transaction_bitmap_opts);
1867
1868                    let mut event_bitmap_opts = cf_options.clone();
1869                    event_bitmap_opts = event_bitmap_opts.set_merge_operator_associative(
1870                        "bitmap_union_merge",
1871                        bitmap_union_merge_operator,
1872                    );
1873                    event_bitmap_opts
1874                        .options
1875                        .set_compaction_filter("event_bitmap_filter", move |_level, key, value| {
1876                            bitmap_filter_event.filter(key, value)
1877                        });
1878                    table_config_map.insert("event_bitmap".to_string(), event_bitmap_opts);
1879
1880                    IndexStoreTables::open_with_options(
1881                        &path,
1882                        options,
1883                        Some(DBMapTableConfigMap::new(table_config_map)),
1884                    )
1885                };
1886
1887                tables
1888                    .init(
1889                        authority_store,
1890                        checkpoint_store,
1891                        epoch_store,
1892                        package_store,
1893                        batch_size_limit,
1894                        &rpc_config,
1895                    )
1896                    .expect("unable to initialize rpc index from live object set");
1897
1898                // Flush all data to disk before dropping tables.
1899                // This is critical because WAL is disabled during bulk indexing.
1900                // Note we only need to call flush on one table because all tables share the same
1901                // underlying database.
1902                tables
1903                    .meta
1904                    .flush()
1905                    .expect("Failed to flush RPC index tables to disk");
1906
1907                let weak_db = Arc::downgrade(&tables.meta.db);
1908                drop(tables);
1909
1910                let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1911                loop {
1912                    if weak_db.strong_count() == 0 {
1913                        break;
1914                    }
1915                    if std::time::Instant::now() > deadline {
1916                        panic!("unable to reopen DB after indexing");
1917                    }
1918                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1919                }
1920
1921                // Reopen the DB with default options (eg without `unordered_write`s enabled)
1922                let reopened_tables =
1923                    IndexStoreTables::open_with_index_options(&path, index_options);
1924
1925                // Sanity check: verify the schema version and feature settings
1926                // were persisted correctly.
1927                let stored_version = reopened_tables
1928                    .meta
1929                    .get(&())
1930                    .expect("Failed to read metadata from reopened database")
1931                    .expect("Metadata not found in reopened database");
1932                assert_eq!(
1933                    stored_version.version, CURRENT_DB_VERSION,
1934                    "Database version mismatch after flush and reopen: expected {:#x}, found {:#x}",
1935                    CURRENT_DB_VERSION, stored_version.version
1936                );
1937                assert_eq!(
1938                    reopened_tables.persisted_ledger_history_indexing(),
1939                    rpc_config.ledger_history_indexing(),
1940                    "ledger-history setting mismatch after flush and reopen"
1941                );
1942
1943                reopened_tables
1944            } else {
1945                // No rebuild needed. If ledger history was on and is now off,
1946                // drop just those CFs rather than reindexing everything.
1947                if tables.persisted_ledger_history_indexing()
1948                    && !rpc_config.ledger_history_indexing()
1949                {
1950                    tables
1951                        .disable_ledger_history_indexing()
1952                        .expect("unable to disable ledger history indexing");
1953                }
1954                tables
1955            }
1956        };
1957
1958        // Hydrate before compaction filters can observe the default 0 floor.
1959        Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
1960
1961        // `ledger_history_enabled` is derived from the persisted `settings` CF,
1962        // not directly from config.
1963        let ledger_history_enabled = tables.persisted_ledger_history_indexing();
1964        debug_assert_eq!(
1965            ledger_history_enabled,
1966            rpc_config.ledger_history_indexing(),
1967            "ledger_history_enabled (from settings CF) must match the configured ledger_history_indexing flag"
1968        );
1969
1970        Self {
1971            tables,
1972            pending_updates: Default::default(),
1973            ledger_history_pruning_watermark: ledger_history_atomic,
1974            ledger_history_enabled,
1975        }
1976    }
1977
1978    /// Hydrate the tx-seq pruning floor from the first live `tx_seq_digest` key.
1979    fn hydrate_ledger_history_pruning_atomic(tables: &IndexStoreTables, atomic: &Arc<AtomicU64>) {
1980        let persisted = tables.first_tx_seq_digest_key().ok().flatten().unwrap_or(0);
1981        atomic.store(persisted, Ordering::Relaxed);
1982    }
1983
1984    pub fn new_without_init(dir: &Path) -> Self {
1985        let path = Self::db_path(dir);
1986
1987        // Keep already-built ledger history indexes prunable in offline paths.
1988        let ledger_history_atomic = Arc::new(AtomicU64::new(0));
1989        let index_options = IndexStoreOptions {
1990            pruning_tx_seq_exclusive: ledger_history_atomic.clone(),
1991        };
1992        let tables = IndexStoreTables::open_with_index_options(path, index_options);
1993
1994        let ledger_history_enabled = tables.persisted_ledger_history_indexing();
1995        Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
1996
1997        Self {
1998            tables,
1999            pending_updates: Default::default(),
2000            ledger_history_pruning_watermark: ledger_history_atomic,
2001            ledger_history_enabled,
2002        }
2003    }
2004
2005    pub fn prune(
2006        &self,
2007        pruned_checkpoint_watermark: u64,
2008        pruned_tx_seq_exclusive: u64,
2009    ) -> Result<(), TypedStoreError> {
2010        self.tables.prune(
2011            pruned_checkpoint_watermark,
2012            pruned_tx_seq_exclusive,
2013            self.ledger_history_enabled,
2014            &self.ledger_history_pruning_watermark,
2015        )
2016    }
2017
2018    /// Index a checkpoint and stage the index updated in `pending_updates`.
2019    ///
2020    /// Updates will not be committed to the database until `commit_update_for_checkpoint` is
2021    /// called.
2022    #[tracing::instrument(
2023        skip_all,
2024        fields(checkpoint = checkpoint.summary.sequence_number)
2025    )]
2026    pub fn index_checkpoint(&self, checkpoint: &Checkpoint) {
2027        let sequence_number = checkpoint.summary.sequence_number;
2028        let batch = self
2029            .tables
2030            .index_checkpoint(checkpoint, self.ledger_history_enabled)
2031            .expect("db error");
2032
2033        self.pending_updates
2034            .lock()
2035            .unwrap()
2036            .insert(sequence_number, batch);
2037    }
2038
2039    /// Commits the pending updates for the provided checkpoint number.
2040    ///
2041    /// Invariants:
2042    /// - `index_checkpoint` must have been called for the provided checkpoint
2043    /// - Callers of this function must ensure that it is called for each checkpoint in sequential
2044    ///   order. This will panic if the provided checkpoint does not match the expected next
2045    ///   checkpoint to commit.
2046    #[tracing::instrument(skip(self))]
2047    pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
2048        let next_batch = self.pending_updates.lock().unwrap().pop_first();
2049
2050        // Its expected that the next batch exists
2051        let (next_sequence_number, batch) = next_batch.unwrap();
2052        assert_eq!(
2053            checkpoint, next_sequence_number,
2054            "commit_update_for_checkpoint must be called in order"
2055        );
2056
2057        Ok(batch.write()?)
2058    }
2059
2060    pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
2061        self.tables.get_epoch_info(epoch)
2062    }
2063
2064    pub fn owner_iter(
2065        &self,
2066        owner: SuiAddress,
2067        object_type: Option<StructTag>,
2068        cursor: Option<OwnerIndexKey>,
2069    ) -> Result<
2070        impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
2071        TypedStoreError,
2072    > {
2073        self.tables.owner_iter(owner, object_type, cursor)
2074    }
2075
2076    pub fn dynamic_field_iter(
2077        &self,
2078        parent: ObjectID,
2079        cursor: Option<ObjectID>,
2080    ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
2081    {
2082        self.tables.dynamic_field_iter(parent, cursor)
2083    }
2084
2085    pub fn get_coin_info(
2086        &self,
2087        coin_type: &StructTag,
2088    ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
2089        self.tables.get_coin_info(coin_type)
2090    }
2091
2092    pub fn get_balance(
2093        &self,
2094        owner: &SuiAddress,
2095        coin_type: &StructTag,
2096    ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
2097        self.tables.get_balance(owner, coin_type)
2098    }
2099
2100    pub fn balance_iter(
2101        &self,
2102        owner: SuiAddress,
2103        cursor: Option<BalanceKey>,
2104    ) -> Result<
2105        impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
2106        TypedStoreError,
2107    > {
2108        self.tables.balance_iter(owner, cursor)
2109    }
2110
2111    pub fn package_versions_iter(
2112        &self,
2113        original_id: ObjectID,
2114        cursor: Option<u64>,
2115    ) -> Result<
2116        impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
2117        TypedStoreError,
2118    > {
2119        self.tables.package_versions_iter(original_id, cursor)
2120    }
2121
2122    pub fn get_highest_indexed_checkpoint_seq_number(
2123        &self,
2124    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
2125        self.tables.watermark.get(&Watermark::Indexed)
2126    }
2127
2128    fn ensure_ledger_history_enabled(&self) -> Result<(), TypedStoreError> {
2129        if self.ledger_history_enabled {
2130            Ok(())
2131        } else {
2132            Err(TypedStoreError::SerializationError(
2133                "ledger history indexing is disabled".to_owned(),
2134            ))
2135        }
2136    }
2137
2138    pub fn ledger_tx_seq_digest(
2139        &self,
2140        tx_seq: u64,
2141    ) -> Result<Option<LedgerTxSeqDigest>, TypedStoreError> {
2142        self.ensure_ledger_history_enabled()?;
2143        Ok(self
2144            .tables
2145            .tx_seq_digest
2146            .get(&tx_seq)?
2147            .map(|info| ledger_tx_seq_digest(tx_seq, info)))
2148    }
2149
2150    pub fn ledger_tx_seq_digest_multi_get(
2151        &self,
2152        tx_seqs: &[u64],
2153    ) -> Result<Vec<Option<LedgerTxSeqDigest>>, TypedStoreError> {
2154        self.ensure_ledger_history_enabled()?;
2155        let rows = self
2156            .tables
2157            .tx_seq_digest
2158            .multi_get(tx_seqs)?
2159            .into_iter()
2160            .zip_debug_eq(tx_seqs.iter().copied())
2161            .map(|(info, tx_seq)| info.map(|info| ledger_tx_seq_digest(tx_seq, info)))
2162            .collect();
2163        Ok(rows)
2164    }
2165
2166    pub fn ledger_tx_seq_digest_iter(
2167        &self,
2168        start: u64,
2169        end_exclusive: u64,
2170        descending: bool,
2171    ) -> Result<LedgerTxSeqDigestIterator<'_>, TypedStoreError> {
2172        self.ensure_ledger_history_enabled()?;
2173        if start >= end_exclusive {
2174            return Ok(Box::new(std::iter::empty()));
2175        }
2176
2177        let iter = if descending {
2178            let upper = end_exclusive - 1;
2179            self.tables
2180                .tx_seq_digest
2181                .reversed_safe_iter_with_bounds(Some(start), Some(upper))?
2182        } else {
2183            self.tables
2184                .tx_seq_digest
2185                .safe_iter_with_bounds(Some(start), Some(end_exclusive))
2186        };
2187
2188        Ok(Box::new(iter.map(|result| {
2189            result.map(|(tx_seq, info)| ledger_tx_seq_digest(tx_seq, info))
2190        })))
2191    }
2192
2193    pub fn transaction_bitmap_bucket_iter(
2194        &self,
2195        dimension_key: Vec<u8>,
2196        start_bucket: u64,
2197        end_bucket_exclusive: u64,
2198        descending: bool,
2199    ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2200        self.ensure_ledger_history_enabled()?;
2201        Self::bitmap_bucket_iter(
2202            &self.tables.transaction_bitmap,
2203            dimension_key,
2204            start_bucket,
2205            end_bucket_exclusive,
2206            descending,
2207        )
2208    }
2209
2210    pub fn event_bitmap_bucket_iter(
2211        &self,
2212        dimension_key: Vec<u8>,
2213        start_bucket: u64,
2214        end_bucket_exclusive: u64,
2215        descending: bool,
2216    ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2217        self.ensure_ledger_history_enabled()?;
2218        Self::bitmap_bucket_iter(
2219            &self.tables.event_bitmap,
2220            dimension_key,
2221            start_bucket,
2222            end_bucket_exclusive,
2223            descending,
2224        )
2225    }
2226
2227    fn bitmap_bucket_iter(
2228        table: &DBMap<BitmapIndexKey, BitmapBlob>,
2229        dimension_key: Vec<u8>,
2230        start_bucket: u64,
2231        end_bucket_exclusive: u64,
2232        descending: bool,
2233    ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2234        if start_bucket >= end_bucket_exclusive {
2235            return Ok(Box::new(std::iter::empty()));
2236        }
2237
2238        let lower = BitmapIndexKey {
2239            dimension_key: dimension_key.clone(),
2240            bucket_id: start_bucket,
2241        };
2242        let upper_exclusive = BitmapIndexKey {
2243            dimension_key,
2244            bucket_id: end_bucket_exclusive,
2245        };
2246        let upper_inclusive = BitmapIndexKey {
2247            dimension_key: upper_exclusive.dimension_key.clone(),
2248            bucket_id: end_bucket_exclusive - 1,
2249        };
2250
2251        let iter: Box<
2252            dyn Iterator<Item = Result<(BitmapIndexKey, BitmapBlob), TypedStoreError>> + '_,
2253        > = if descending {
2254            table.reversed_safe_iter_with_bounds(Some(lower), Some(upper_inclusive))?
2255        } else {
2256            table.safe_iter_with_bounds(Some(lower), Some(upper_exclusive))
2257        };
2258
2259        Ok(Box::new(iter.map(|result| {
2260            result.and_then(|(key, blob)| decode_ledger_bitmap_bucket(key, blob))
2261        })))
2262    }
2263}
2264
2265/// Objects that existed before this transaction but no longer exist after
2266/// (deleted or wrapped). Mirrors `CheckpointTransaction::removed_objects_pre_version`
2267/// for `ExecutedTransaction`, which stores its objects in a shared `ObjectSet`
2268/// keyed by `(id, version)` rather than in dense per-tx vectors.
2269fn tx_removed_objects_pre_version<'a>(
2270    tx: &'a ExecutedTransaction,
2271    object_set: &'a ObjectSet,
2272) -> impl Iterator<Item = &'a Object> + 'a {
2273    tx.effects
2274        .object_changes()
2275        .into_iter()
2276        .filter_map(
2277            move |change| match (change.input_version, change.output_version) {
2278                (Some(input_version), None) => object_set.get(&ObjectKey(change.id, input_version)),
2279                _ => None,
2280            },
2281        )
2282}
2283
2284/// Pairs of `(output_object, optional_input_object)` for every changed object
2285/// (mutated, created, or unwrapped). Mirrors `CheckpointTransaction::changed_objects`
2286/// for `ExecutedTransaction`.
2287fn tx_changed_objects<'a>(
2288    tx: &'a ExecutedTransaction,
2289    object_set: &'a ObjectSet,
2290) -> impl Iterator<Item = (&'a Object, Option<&'a Object>)> + 'a {
2291    tx.effects
2292        .object_changes()
2293        .into_iter()
2294        .filter_map(move |change| {
2295            let output = change
2296                .output_version
2297                .and_then(|v| object_set.get(&ObjectKey(change.id, v)))?;
2298            let input = change
2299                .input_version
2300                .and_then(|v| object_set.get(&ObjectKey(change.id, v)));
2301            Some((output, input))
2302        })
2303}
2304
2305fn should_index_dynamic_field(object: &Object) -> bool {
2306    // Skip any objects that aren't of type `Field<Name, Value>`
2307    //
2308    // All dynamic fields are of type:
2309    //   - Field<Name, Value> for dynamic fields
2310    //   - Field<Wrapper<Name>, ID>> for dynamic field objects where the ID is the id of the pointed
2311    //   to object
2312    //
2313    object
2314        .data
2315        .try_as_move()
2316        .is_some_and(|move_object| move_object.type_().is_dynamic_field())
2317}
2318
2319fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
2320    use sui_types::coin::CoinMetadata;
2321    use sui_types::coin::RegulatedCoinMetadata;
2322    use sui_types::coin::TreasuryCap;
2323
2324    let object_type = object.type_().and_then(MoveObjectType::other)?;
2325
2326    if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
2327        return Some((
2328            CoinIndexKey { coin_type },
2329            CoinIndexInfo {
2330                coin_metadata_object_id: Some(object.id()),
2331                treasury_object_id: None,
2332                regulated_coin_metadata_object_id: None,
2333            },
2334        ));
2335    }
2336
2337    if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
2338        return Some((
2339            CoinIndexKey { coin_type },
2340            CoinIndexInfo {
2341                coin_metadata_object_id: None,
2342                treasury_object_id: Some(object.id()),
2343                regulated_coin_metadata_object_id: None,
2344            },
2345        ));
2346    }
2347
2348    if let Some(coin_type) =
2349        RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
2350    {
2351        return Some((
2352            CoinIndexKey { coin_type },
2353            CoinIndexInfo {
2354                coin_metadata_object_id: None,
2355                treasury_object_id: None,
2356                regulated_coin_metadata_object_id: Some(object.id()),
2357            },
2358        ));
2359    }
2360
2361    None
2362}
2363
2364struct RpcParLiveObjectSetIndexer<'a> {
2365    tables: &'a IndexStoreTables,
2366    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2367    batch_size_limit: usize,
2368}
2369
2370struct RpcLiveObjectIndexer<'a> {
2371    tables: &'a IndexStoreTables,
2372    batch: typed_store::rocks::DBBatch,
2373    coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2374    balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
2375    batch_size_limit: usize,
2376}
2377
2378impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
2379    type ObjectIndexer = RpcLiveObjectIndexer<'a>;
2380
2381    fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
2382        RpcLiveObjectIndexer {
2383            tables: self.tables,
2384            batch: self.tables.owner.batch(),
2385            coin_index: self.coin_index,
2386            balance_changes: HashMap::new(),
2387            batch_size_limit: self.batch_size_limit,
2388        }
2389    }
2390}
2391
2392impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
2393    fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
2394        match object.owner {
2395            // Owner Index
2396            Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
2397                let owner_key = OwnerIndexKey::from_object(&object);
2398                let owner_info = OwnerIndexInfo::new(&object);
2399                self.batch
2400                    .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
2401
2402                if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
2403                    let balance_key = BalanceKey { owner, coin_type };
2404                    let balance_info = BalanceIndexInfo {
2405                        coin_balance_delta: value.into(),
2406                        address_balance_delta: 0,
2407                    };
2408                    self.balance_changes
2409                        .entry(balance_key)
2410                        .or_default()
2411                        .merge_delta(&balance_info);
2412
2413                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2414                        self.batch.partial_merge_batch(
2415                            &self.tables.balance,
2416                            std::mem::take(&mut self.balance_changes),
2417                        )?;
2418                    }
2419                }
2420            }
2421
2422            // Dynamic Field Index
2423            Owner::ObjectOwner(parent) => {
2424                if should_index_dynamic_field(&object) {
2425                    let field_key = DynamicFieldKey::new(parent, object.id());
2426                    self.batch
2427                        .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
2428                }
2429
2430                // Index address balances
2431                if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
2432                    && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
2433                {
2434                    let balance_key = BalanceKey { owner, coin_type };
2435                    let balance_info = BalanceIndexInfo {
2436                        coin_balance_delta: 0,
2437                        address_balance_delta: balance,
2438                    };
2439                    self.balance_changes
2440                        .entry(balance_key)
2441                        .or_default()
2442                        .merge_delta(&balance_info);
2443
2444                    if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2445                        self.batch.partial_merge_batch(
2446                            &self.tables.balance,
2447                            std::mem::take(&mut self.balance_changes),
2448                        )?;
2449                    }
2450                }
2451            }
2452
2453            Owner::Shared { .. } | Owner::Immutable => {}
2454
2455            Owner::Party { .. } => {
2456                // TODO(Party WIP)
2457                todo!("Party WIP");
2458                // We could maybe look at non-default permissions for "owners"?
2459            }
2460        }
2461
2462        // Look for CoinMetadata<T> and TreasuryCap<T> objects
2463        if let Some((key, value)) = try_create_coin_index_info(&object) {
2464            use std::collections::hash_map::Entry;
2465
2466            match self.coin_index.lock().unwrap().entry(key) {
2467                Entry::Occupied(mut o) => {
2468                    o.get_mut().merge(value);
2469                }
2470                Entry::Vacant(v) => {
2471                    v.insert(value);
2472                }
2473            }
2474        }
2475
2476        if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
2477            self.batch
2478                .insert_batch(&self.tables.package_version, [(key, info)])?;
2479        }
2480
2481        // If the batch size grows to greater than the limit then write out to the DB so that the
2482        // data we need to hold in memory doesn't grow unbounded.
2483        if self.batch.size_in_bytes() >= self.batch_size_limit {
2484            std::mem::replace(&mut self.batch, self.tables.owner.batch())
2485                .write_opt(bulk_ingestion_write_options())?;
2486        }
2487
2488        Ok(())
2489    }
2490
2491    fn finish(mut self) -> Result<(), StorageError> {
2492        self.batch.partial_merge_batch(
2493            &self.tables.balance,
2494            std::mem::take(&mut self.balance_changes),
2495        )?;
2496        self.batch.write_opt(bulk_ingestion_write_options())?;
2497        Ok(())
2498    }
2499}
2500
2501// TODO figure out a way to dedup this logic. Today we'd need to do quite a bit of refactoring to
2502// make it possible.
2503/// Load a full `Checkpoint` for `checkpoint` from local storage. Sibling
2504/// of [`sparse_checkpoint_for_epoch_backfill`] that returns data for
2505/// every cp (not just genesis / EoE) and always loads transaction events.
2506///
2507/// Returns `Ok(None)` if the cp's summary or contents are not present
2508/// locally (e.g. pruned out of the underlying store).
2509fn full_checkpoint_for_backfill(
2510    authority_store: &AuthorityStore,
2511    checkpoint_store: &CheckpointStore,
2512    checkpoint: u64,
2513) -> Result<Option<Checkpoint>, StorageError> {
2514    let Some(summary) = checkpoint_store.get_checkpoint_by_sequence_number(checkpoint)? else {
2515        return Ok(None);
2516    };
2517    let Some(contents) = checkpoint_store.get_checkpoint_contents(&summary.content_digest)? else {
2518        return Ok(None);
2519    };
2520
2521    // Always load events: event-space dimensions need them, and tx-space
2522    // dimensions include EmitModule / EventType / EventStreamHead which are
2523    // sourced from events too.
2524    let (transactions, object_set) = load_executed_transactions(authority_store, &contents, true)?;
2525
2526    Ok(Some(Checkpoint {
2527        summary: summary.into(),
2528        contents,
2529        transactions,
2530        object_set,
2531    }))
2532}
2533
2534fn sparse_checkpoint_for_epoch_backfill(
2535    authority_store: &AuthorityStore,
2536    checkpoint_store: &CheckpointStore,
2537    checkpoint: u64,
2538) -> Result<Option<Checkpoint>, StorageError> {
2539    let summary = checkpoint_store
2540        .get_checkpoint_by_sequence_number(checkpoint)?
2541        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2542
2543    // Only load genesis and end of epoch checkpoints
2544    if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
2545        return Ok(None);
2546    }
2547
2548    let contents = checkpoint_store
2549        .get_checkpoint_contents(&summary.content_digest)?
2550        .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2551
2552    let (transactions, object_set) = load_executed_transactions(authority_store, &contents, false)?;
2553
2554    Ok(Some(Checkpoint {
2555        summary: summary.into(),
2556        contents,
2557        transactions,
2558        object_set,
2559    }))
2560}
2561
2562/// Load `ExecutedTransaction`s for every digest in `contents`, alongside an
2563/// `ObjectSet` populated with their input and output objects. `Checkpoint`
2564/// stores objects in a shared keyed set rather than the per-tx vectors used
2565/// by the old `CheckpointTransaction`, so this is the equivalent shape for
2566/// the backfill loaders to return.
2567fn load_executed_transactions(
2568    authority_store: &AuthorityStore,
2569    contents: &sui_types::messages_checkpoint::CheckpointContents,
2570    load_events: bool,
2571) -> Result<(Vec<ExecutedTransaction>, ObjectSet), StorageError> {
2572    let transaction_digests = contents
2573        .iter()
2574        .map(|execution_digests| execution_digests.transaction)
2575        .collect::<Vec<_>>();
2576    let transactions = authority_store
2577        .multi_get_transaction_blocks(&transaction_digests)?
2578        .into_iter()
2579        .map(|maybe_transaction| {
2580            maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
2581        })
2582        .collect::<Result<Vec<_>, _>>()?;
2583
2584    let effects = authority_store
2585        .multi_get_executed_effects(&transaction_digests)?
2586        .into_iter()
2587        .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
2588        .collect::<Result<Vec<_>, _>>()?;
2589
2590    let events = if load_events {
2591        authority_store
2592            .multi_get_events(&transaction_digests)
2593            .map_err(|e| StorageError::custom(e.to_string()))?
2594    } else {
2595        vec![None; transaction_digests.len()]
2596    };
2597
2598    let mut full_transactions = Vec::with_capacity(transactions.len());
2599    let mut object_set = ObjectSet::default();
2600    for ((tx, fx), ev) in transactions
2601        .into_iter()
2602        .zip_debug_eq(effects)
2603        .zip_debug_eq(events)
2604    {
2605        let input_objects =
2606            sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
2607        let output_objects =
2608            sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
2609
2610        for obj in input_objects.into_iter().chain(output_objects.into_iter()) {
2611            object_set.insert(obj);
2612        }
2613
2614        let sender_signed = sui_types::transaction::Transaction::from(tx)
2615            .into_data()
2616            .into_inner();
2617        full_transactions.push(ExecutedTransaction {
2618            transaction: sender_signed.intent_message.value,
2619            signatures: sender_signed.tx_signatures,
2620            effects: fx,
2621            events: ev,
2622            // The backfill index paths (`index_epoch` and
2623            // `write_ledger_history_rows_for_checkpoint`) only read objects
2624            // through `effects.object_changes()`, which never reaches
2625            // unchanged loaded runtime objects. Leaving this empty avoids a
2626            // pointless lookup per checkpoint.
2627            unchanged_loaded_runtime_objects: Vec::new(),
2628        });
2629    }
2630
2631    Ok((full_transactions, object_set))
2632}
2633
2634fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
2635    match Coin::extract_balance_if_coin(object) {
2636        Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
2637        Ok(Some(_)) => {
2638            debug!("Coin object {} has non-struct type tag", object.id());
2639            Ok(None)
2640        }
2641        Ok(None) => {
2642            // Not a coin
2643            Ok(None)
2644        }
2645        Err(e) => {
2646            // Corrupted coin data
2647            Err(StorageError::custom(format!(
2648                "Failed to deserialize coin object {}: {}",
2649                object.id(),
2650                e
2651            )))
2652        }
2653    }
2654}
2655
2656fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
2657    let move_object = object.data.try_as_move()?;
2658
2659    let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
2660    else {
2661        return None;
2662    };
2663
2664    let (key, value): (
2665        sui_types::accumulator_root::AccumulatorKey,
2666        sui_types::accumulator_root::AccumulatorValue,
2667    ) = move_object.try_into().ok()?;
2668
2669    let balance = value.as_u128()? as i128;
2670    if balance <= 0 {
2671        return None;
2672    }
2673
2674    Some((key.owner, *coin_type, balance))
2675}
2676
2677#[cfg(test)]
2678mod tests {
2679    use super::*;
2680    use std::sync::atomic::AtomicU64;
2681
2682    /// Every column family opened via `open_with_index_options` must have the
2683    /// `disable_write_throttling` override applied. The typed-store derive
2684    /// macro silently falls back to bare `default_db_options()` for any CF
2685    /// missing from `tables_db_options_override`, reverting to RocksDB's
2686    /// default stall triggers (slowdown=20, stop=36) — small enough that ~80
2687    /// L0 files would stop writes entirely. RocksDB persists the effective
2688    /// per-CF options to an `OPTIONS-NNNNNN` file at open; parse it to verify
2689    /// every CF received the override.
2690    #[tokio::test]
2691    async fn open_with_index_options_overrides_every_cf() {
2692        let temp_dir = tempfile::tempdir().unwrap();
2693        let db_path = temp_dir.path().join("rpc-index");
2694
2695        let _tables =
2696            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2697
2698        // Iterate the CFs RocksDB actually wrote to the OPTIONS file rather
2699        // than the schema-declared set, since deprecated CFs are dropped at
2700        // open time and never appear in OPTIONS. RocksDB always writes a
2701        // `default` CF; we exclude it because typed-store doesn't store data
2702        // there and we don't configure it.
2703        let per_cf = parse_cf_options(&db_path);
2704        assert!(
2705            !per_cf.is_empty(),
2706            "expected at least one CFOptions section in OPTIONS file"
2707        );
2708        for (cf_name, opts) in &per_cf {
2709            if cf_name == "default" {
2710                continue;
2711            }
2712            for (key, expected) in [
2713                ("level0_slowdown_writes_trigger", "512"),
2714                ("level0_stop_writes_trigger", "1024"),
2715                ("soft_pending_compaction_bytes_limit", "0"),
2716                ("hard_pending_compaction_bytes_limit", "0"),
2717            ] {
2718                let actual = opts
2719                    .get(key)
2720                    .unwrap_or_else(|| panic!("cf `{cf_name}` missing `{key}`"));
2721                assert_eq!(
2722                    actual, expected,
2723                    "cf `{cf_name}` has `{key}={actual}`, expected `{expected}` — \
2724                     the typed-store override map likely doesn't cover this CF"
2725                );
2726            }
2727        }
2728    }
2729
2730    #[test]
2731    fn checked_encode_event_seq_rejects_unrepresentable_values() {
2732        assert!(
2733            checked_encode_event_seq(0, MAX_EVENTS_PER_TX).is_err(),
2734            "event_idx at MAX_EVENTS_PER_TX must be rejected"
2735        );
2736        assert!(
2737            checked_encode_event_seq(MAX_TX_SEQ + 1, 0).is_err(),
2738            "tx_seq past MAX_TX_SEQ must be rejected"
2739        );
2740    }
2741
2742    /// Compaction filter math: tx-bitmap whole-bucket removability around
2743    /// the tx == 0 boundary. With the pruning atomic at 1, only tx_seq 0
2744    /// is gone. Bucket 0 spans tx_seqs [0, 65_536), so it is NOT entirely
2745    /// pruned and must be kept. This is exactly the off-by-one case the
2746    /// exclusive floor is supposed to make explicit.
2747    #[test]
2748    fn bitmap_filter_keeps_bucket_with_live_tx_above_zero_watermark() {
2749        let watermark = Arc::new(AtomicU64::new(1));
2750        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2751
2752        let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2753            dimension_key: vec![1, 2, 3],
2754            bucket_id: 0,
2755        });
2756        assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2757
2758        // Once the watermark advances to TX_BUCKET_SIZE, bucket 0 becomes
2759        // fully prunable (highest tx in bucket 0 is 65_535, exclusive of
2760        // 65_536 means the next bucket starts there — everything below is
2761        // pruned).
2762        watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
2763        assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2764    }
2765
2766    /// Compaction filter math: event-bitmap removability uses
2767    /// `event_seq_lo(pruned_exclusive)` as the threshold, so its math is
2768    /// scaled by EVENT_BITS relative to tx-bitmap.
2769    #[test]
2770    fn bitmap_filter_event_bucket_uses_event_seq_lo() {
2771        let watermark = Arc::new(AtomicU64::new(0));
2772        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Event);
2773
2774        let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2775            dimension_key: vec![5],
2776            bucket_id: 0,
2777        });
2778        // Watermark 0: nothing pruned → keep.
2779        assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2780
2781        // The highest tx whose event_seq can fall in bucket 0 is
2782        // (EVENT_BUCKET_SIZE / MAX_EVENTS_PER_TX) - 1. Need watermark to
2783        // exceed that for bucket 0 to be fully prunable.
2784        let txs_per_bucket = EVENT_BUCKET_SIZE / MAX_EVENTS_PER_TX as u64;
2785        watermark.store(txs_per_bucket, Ordering::Relaxed);
2786        assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2787    }
2788
2789    /// Malformed keys must never be silently `Remove`d — silent data loss
2790    /// is much worse than a stuck row. A too-short key and a key with a
2791    /// bucket_id that would overflow the bucket-hi computation should
2792    /// both be kept.
2793    #[test]
2794    fn bitmap_filter_keeps_malformed_keys() {
2795        let watermark = Arc::new(AtomicU64::new(u64::MAX));
2796        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2797
2798        assert!(matches!(filter.filter(b"short", &[]), Decision::Keep));
2799        assert!(matches!(filter.filter(&[], &[]), Decision::Keep));
2800
2801        // bucket_id near u64::MAX would overflow `(b+1)*TX_BUCKET_SIZE`.
2802        // The checked math returns None → keep.
2803        let huge = typed_store::be_fix_int_ser(&BitmapIndexKey {
2804            dimension_key: vec![],
2805            bucket_id: u64::MAX - 1,
2806        });
2807        assert!(huge.len() >= 8);
2808        assert!(matches!(filter.filter(&huge, &[]), Decision::Keep));
2809    }
2810
2811    /// Round-trip the typed-store encoding: a `BitmapIndexKey` encoded via
2812    /// `be_fix_int_ser` ends with the bucket_id as 8 big-endian bytes that
2813    /// the compaction filter reads back. Guards against silent drift if
2814    /// typed-store changes its key serializer.
2815    #[test]
2816    fn bitmap_filter_decodes_typed_store_keys() {
2817        let watermark = Arc::new(AtomicU64::new(0));
2818        let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2819
2820        // Build a key with a bucket_id that, after advancing the watermark
2821        // far enough, would be removable. Confirm the filter agrees.
2822        let bucket_id = 7u64;
2823        let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2824            dimension_key: vec![0xAA, 0xBB, 0xCC],
2825            bucket_id,
2826        });
2827        // First, with watermark = 0, definitely keep.
2828        assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2829
2830        // Advance watermark past (bucket_id + 1) * TX_BUCKET_SIZE → remove.
2831        watermark.store((bucket_id + 1) * TX_BUCKET_SIZE, Ordering::Relaxed);
2832        assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2833    }
2834
2835    /// The merge operator must OR multiple operands into a single bitmap —
2836    /// not last-write-wins, which is what we'd get without it.
2837    #[test]
2838    fn bitmap_merge_operator_unions_operands() {
2839        let mut bm_a = RoaringBitmap::new();
2840        bm_a.insert(1);
2841        bm_a.insert(5);
2842        let blob_a = encode_bitmap_blob(&bm_a);
2843
2844        let mut bm_b = RoaringBitmap::new();
2845        bm_b.insert(5);
2846        bm_b.insert(7);
2847        let blob_b = encode_bitmap_blob(&bm_b);
2848
2849        let mut bm_c = RoaringBitmap::new();
2850        bm_c.insert(100);
2851        let blob_c = encode_bitmap_blob(&bm_c);
2852
2853        // Simulate rocksdb feeding [blob_b, blob_c] as operands with no
2854        // existing on-disk value (which is what happens on first merge into
2855        // a new key).
2856        //
2857        // We can't easily construct a `MergeOperands` from outside rocksdb,
2858        // so test the decode/encode round-trip via the helpers directly and
2859        // assert the union over decoded bitmaps. This validates the data
2860        // path the merge operator depends on; the operator's loop is a
2861        // trivial `acc |= bm` on top.
2862        let decoded_a = decode_bitmap_blob(&blob_a).expect("decode a");
2863        let decoded_b = decode_bitmap_blob(&blob_b).expect("decode b");
2864        let decoded_c = decode_bitmap_blob(&blob_c).expect("decode c");
2865        let unioned = decoded_a | decoded_b | decoded_c;
2866        let mut expected = RoaringBitmap::new();
2867        for b in [1, 5, 7, 100] {
2868            expected.insert(b);
2869        }
2870        assert_eq!(unioned, expected);
2871    }
2872
2873    /// End-to-end: write merge operands across multiple "checkpoints" into a
2874    /// real DBMap and confirm the merge operator unions them when read back.
2875    /// This is the only test that exercises the in-rocksdb merge path,
2876    /// since `bitmap_merge_operator_unions_operands` only round-trips the
2877    /// helpers.
2878    #[tokio::test]
2879    async fn bitmap_merge_operator_unions_across_writes() {
2880        let temp_dir = tempfile::tempdir().unwrap();
2881        let db_path = temp_dir.path().join("rpc-index");
2882        let tables =
2883            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2884
2885        let key = BitmapIndexKey {
2886            dimension_key: vec![1, 2, 3],
2887            bucket_id: 0,
2888        };
2889
2890        // Write three merge operands targeting the same key. Without the
2891        // merge operator, the last write would clobber the first two; with
2892        // it, all bits should be present.
2893        for bits in [vec![1u32, 2], vec![3, 4], vec![5, 6, 7]] {
2894            let mut bm = RoaringBitmap::new();
2895            for b in bits {
2896                bm.insert(b);
2897            }
2898            let mut batch = tables.transaction_bitmap.batch();
2899            batch
2900                .partial_merge_batch(
2901                    &tables.transaction_bitmap,
2902                    [(key.clone(), BitmapBlob::from(bm))],
2903                )
2904                .unwrap();
2905            batch.write().unwrap();
2906        }
2907
2908        let blob = tables
2909            .transaction_bitmap
2910            .get(&key)
2911            .unwrap()
2912            .expect("merged row should exist");
2913        let bm = RoaringBitmap::deserialize_from(&blob.0[..]).unwrap();
2914        let got: Vec<u32> = bm.iter().collect();
2915        assert_eq!(got, vec![1, 2, 3, 4, 5, 6, 7]);
2916    }
2917
2918    /// Whole-bucket compaction-filter removal: write bits to buckets 0 and
2919    /// 1, advance the pruning watermark past bucket 0 only, force a
2920    /// compaction, then assert bucket 0 is gone and bucket 1 survives.
2921    #[tokio::test]
2922    async fn bitmap_filter_removes_whole_bucket_after_compaction() {
2923        let temp_dir = tempfile::tempdir().unwrap();
2924        let db_path = temp_dir.path().join("rpc-index");
2925
2926        let watermark = Arc::new(AtomicU64::new(0));
2927        let index_options = IndexStoreOptions {
2928            pruning_tx_seq_exclusive: watermark.clone(),
2929        };
2930        let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
2931
2932        let dim_key = vec![0x01, 0xAA];
2933        let k0 = BitmapIndexKey {
2934            dimension_key: dim_key.clone(),
2935            bucket_id: 0,
2936        };
2937        let k1 = BitmapIndexKey {
2938            dimension_key: dim_key.clone(),
2939            bucket_id: 1,
2940        };
2941        let mut bm = RoaringBitmap::new();
2942        bm.insert(0);
2943
2944        // Use a direct insert rather than a merge so we exercise the
2945        // compaction filter on a regular value. (Merge interaction is
2946        // covered by `bitmap_merge_operator_unions_across_writes`.)
2947        let blob = BitmapBlob::from(bm);
2948        let mut batch = tables.transaction_bitmap.batch();
2949        batch
2950            .insert_batch(
2951                &tables.transaction_bitmap,
2952                [(k0.clone(), blob.clone()), (k1.clone(), blob)],
2953            )
2954            .unwrap();
2955        batch.write().unwrap();
2956        tables.transaction_bitmap.flush().unwrap();
2957
2958        // Sanity-check: both buckets are present before advancing the
2959        // watermark.
2960        assert!(tables.transaction_bitmap.get(&k0).unwrap().is_some());
2961        assert!(tables.transaction_bitmap.get(&k1).unwrap().is_some());
2962
2963        // Advance watermark past bucket 0 but not past bucket 1.
2964        watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
2965
2966        // Compact the entire keyspace with raw byte bounds to ensure we
2967        // cover every encoded BitmapIndexKey, regardless of typed-store's
2968        // length-prefix width.
2969        tables
2970            .transaction_bitmap
2971            .compact_range_raw("transaction_bitmap", vec![], vec![0xFF; 128])
2972            .unwrap();
2973
2974        assert!(
2975            tables.transaction_bitmap.get(&k0).unwrap().is_none(),
2976            "bucket 0 should have been removed by the compaction filter"
2977        );
2978        assert!(
2979            tables.transaction_bitmap.get(&k1).unwrap().is_some(),
2980            "bucket 1 should still be present (only bucket 0 was below the watermark)"
2981        );
2982    }
2983
2984    /// The retained backfill range must surface missing cps as an error,
2985    /// not silently leave a permanent hole. This tests the
2986    /// `ok_or_else(...)` conversion isolated from the full init path.
2987    #[test]
2988    fn backfill_missing_cp_in_retained_range_is_error() {
2989        // Mirror the backfill closure: take the
2990        // `Result<Option<Checkpoint>>` from the loader, and require
2991        // `Some(checkpoint)` for every cp in the retained range.
2992        let checkpoint_range = 5u64..=10u64;
2993        let seq = 7u64;
2994        let loaded: Result<Option<Checkpoint>, StorageError> = Ok(None);
2995
2996        let result: Result<(), StorageError> = (|| {
2997            let checkpoint = loaded?.ok_or_else(|| {
2998                StorageError::missing(format!(
2999                    "ledger history backfill: checkpoint {seq} is missing from local storage \
3000                     but falls inside the retained backfill range {checkpoint_range:?}"
3001                ))
3002            })?;
3003            let _ = checkpoint;
3004            Ok(())
3005        })();
3006
3007        let err = result.expect_err("missing cp must error out, not silently succeed");
3008        let msg = err.to_string();
3009        assert!(
3010            msg.contains(&format!("checkpoint {seq}")),
3011            "error should name the missing cp: {msg}"
3012        );
3013        assert!(
3014            msg.contains("5..=10"),
3015            "error should name the retained range: {msg}"
3016        );
3017    }
3018
3019    /// Existing watermark rows on disk encode `Indexed` and `Pruned` as serde
3020    /// indexes 0 and 1. Reordering the enum (or inserting a variant before
3021    /// them) would shift those discriminants and silently misread on-disk
3022    /// rows. Hardcode the legacy bytes and confirm they still round-trip.
3023    #[test]
3024    fn legacy_watermark_bytes_still_deserialize() {
3025        // BCS encodes a unit enum variant as a ULEB128 of its index.
3026        let indexed_bytes = bcs::to_bytes(&Watermark::Indexed).unwrap();
3027        let pruned_bytes = bcs::to_bytes(&Watermark::Pruned).unwrap();
3028        assert_eq!(
3029            indexed_bytes,
3030            vec![0],
3031            "Watermark::Indexed must encode as 0"
3032        );
3033        assert_eq!(pruned_bytes, vec![1], "Watermark::Pruned must encode as 1");
3034
3035        // Feed the canonical legacy bytes to the deserializer and confirm
3036        // they still arrive at the right variants. This is the test that
3037        // would catch an accidental reorder of the enum.
3038        let decoded_indexed: Watermark = bcs::from_bytes(&[0]).unwrap();
3039        let decoded_pruned: Watermark = bcs::from_bytes(&[1]).unwrap();
3040        assert!(matches!(decoded_indexed, Watermark::Indexed));
3041        assert!(matches!(decoded_pruned, Watermark::Pruned));
3042    }
3043
3044    /// The schema version and the ledger-history feature flag are tracked
3045    /// independently: the flag round-trips through the `settings` CF, and
3046    /// *enabling* the feature forces a reinit while the schema version is
3047    /// unchanged. *Disabling* it does not (that is handled in place).
3048    #[tokio::test]
3049    async fn settings_cf_round_trips_and_drives_reinit() {
3050        let temp_dir = tempfile::tempdir().unwrap();
3051        let db_path = temp_dir.path().join("rpc-index");
3052        let tables =
3053            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3054
3055        // Stamp a current-schema DB built with ledger history disabled.
3056        tables
3057            .meta
3058            .insert(
3059                &(),
3060                &MetadataInfo {
3061                    version: CURRENT_DB_VERSION,
3062                },
3063            )
3064            .unwrap();
3065        tables
3066            .settings
3067            .insert(
3068                &(),
3069                &encode_settings(&IndexSettings {
3070                    ledger_history_indexing: false,
3071                }),
3072            )
3073            .unwrap();
3074        assert!(!tables.persisted_ledger_history_indexing());
3075
3076        // An empty checkpoint store keeps the watermark check out of the way so
3077        // we isolate the schema-version / feature-toggle logic.
3078        let checkpoint_store = CheckpointStore::new_for_tests();
3079
3080        // Same schema, same feature setting => no reinit.
3081        assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3082        // Same schema, but the feature was toggled on => reinit.
3083        assert!(tables.needs_to_do_initialization(&checkpoint_store, true));
3084
3085        // Flip the persisted flag; it round-trips.
3086        tables
3087            .settings
3088            .insert(
3089                &(),
3090                &encode_settings(&IndexSettings {
3091                    ledger_history_indexing: true,
3092                }),
3093            )
3094            .unwrap();
3095        assert!(tables.persisted_ledger_history_indexing());
3096        // Already enabled => no reinit.
3097        assert!(!tables.needs_to_do_initialization(&checkpoint_store, true));
3098        // Disabling does NOT force a rebuild — the history CFs are dropped in
3099        // place instead.
3100        assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3101    }
3102
3103    /// `disable_ledger_history_indexing` empties every ledger-history CF (not
3104    /// all-but-the-last, the trap with end-exclusive range deletes) and clears
3105    /// the persisted flag, while leaving the base indexes untouched.
3106    #[tokio::test]
3107    async fn disable_ledger_history_indexing_drops_history_cfs_in_place() {
3108        let temp_dir = tempfile::tempdir().unwrap();
3109        let db_path = temp_dir.path().join("rpc-index");
3110        let tables =
3111            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3112
3113        // Seed a ledger-history DB with several rows in each history CF.
3114        tables
3115            .settings
3116            .insert(
3117                &(),
3118                &encode_settings(&IndexSettings {
3119                    ledger_history_indexing: true,
3120                }),
3121            )
3122            .unwrap();
3123        for tx_seq in 0..5u64 {
3124            tables
3125                .tx_seq_digest
3126                .insert(
3127                    &tx_seq,
3128                    &TxSeqDigestInfo {
3129                        digest: TransactionDigest::new([0; 32]),
3130                        event_count: 0,
3131                        tx_offset: 0,
3132                        checkpoint_number: 0,
3133                    },
3134                )
3135                .unwrap();
3136        }
3137        for bucket_id in 0..5u64 {
3138            let key = BitmapIndexKey {
3139                dimension_key: vec![1, 2, 3],
3140                bucket_id,
3141            };
3142            tables
3143                .transaction_bitmap
3144                .insert(&key, &BitmapBlob(vec![0xab]))
3145                .unwrap();
3146            tables
3147                .event_bitmap
3148                .insert(&key, &BitmapBlob(vec![0xcd]))
3149                .unwrap();
3150        }
3151        // A non-history CF that must survive the drop.
3152        tables.watermark.insert(&Watermark::Indexed, &42).unwrap();
3153
3154        tables.disable_ledger_history_indexing().unwrap();
3155
3156        // Every history row is gone — including the last key in each CF.
3157        assert!(tables.tx_seq_digest.is_empty());
3158        assert!(tables.transaction_bitmap.is_empty());
3159        assert!(tables.event_bitmap.is_empty());
3160        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), None);
3161
3162        // The flag is cleared and the base index is untouched.
3163        assert!(!tables.persisted_ledger_history_indexing());
3164        assert_eq!(tables.watermark.get(&Watermark::Indexed).unwrap(), Some(42));
3165    }
3166
3167    /// `prune()` advances `Watermark::Pruned` and deletes the exact
3168    /// tx_seq_digest range below the new tx-seq floor when enabled.
3169    #[tokio::test]
3170    async fn prune_maintains_ledger_history_state_when_active() {
3171        let temp_dir = tempfile::tempdir().unwrap();
3172        let db_path = temp_dir.path().join("rpc-index");
3173        let tables =
3174            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3175
3176        // Seed rows to simulate an indexed ledger history subsystem.
3177        let mut batch = tables.tx_seq_digest.batch();
3178        for tx_seq in 0..5u64 {
3179            batch
3180                .insert_batch(
3181                    &tables.tx_seq_digest,
3182                    [(
3183                        tx_seq,
3184                        TxSeqDigestInfo {
3185                            digest: TransactionDigest::new([0; 32]),
3186                            event_count: 0,
3187                            tx_offset: 0,
3188                            checkpoint_number: 0,
3189                        },
3190                    )],
3191                )
3192                .unwrap();
3193        }
3194        batch.write().unwrap();
3195
3196        // Prune cp 1 with an absolute tx-seq floor of 3 → rows 0..3 should
3197        // be deleted, the derived floor advances from 0 to 3.
3198        let pruning_atomic = AtomicU64::new(0);
3199        tables
3200            .prune(1, 3, /*ledger_history_enabled=*/ true, &pruning_atomic)
3201            .unwrap();
3202        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3203
3204        assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3205        for tx_seq in 0..3u64 {
3206            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3207        }
3208        for tx_seq in 3..5u64 {
3209            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3210        }
3211        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3212    }
3213
3214    /// When disabled, `prune` must advance only `Watermark::Pruned`.
3215    #[tokio::test]
3216    async fn prune_skips_ledger_history_state_when_inactive() {
3217        let temp_dir = tempfile::tempdir().unwrap();
3218        let db_path = temp_dir.path().join("rpc-index");
3219        let tables =
3220            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3221
3222        // Seed a tx_seq_digest row so we can confirm prune leaves it alone.
3223        tables
3224            .tx_seq_digest
3225            .insert(
3226                &0u64,
3227                &TxSeqDigestInfo {
3228                    digest: TransactionDigest::new([0; 32]),
3229                    event_count: 0,
3230                    tx_offset: 0,
3231                    checkpoint_number: 0,
3232                },
3233            )
3234            .unwrap();
3235
3236        let pruning_atomic = AtomicU64::new(0);
3237        tables
3238            .prune(
3239                5,
3240                3,
3241                /*ledger_history_enabled=*/ false,
3242                &pruning_atomic,
3243            )
3244            .unwrap();
3245        assert_eq!(
3246            pruning_atomic.load(Ordering::Relaxed),
3247            0,
3248            "disabled prune must not advance the compaction-filter atomic"
3249        );
3250        assert_eq!(
3251            tables.watermark.get(&Watermark::Pruned).unwrap(),
3252            Some(5),
3253            "base pruning must still advance"
3254        );
3255        assert!(
3256            tables.tx_seq_digest.get(&0u64).unwrap().is_some(),
3257            "tx_seq_digest rows must remain untouched when inactive"
3258        );
3259    }
3260
3261    /// Forward `index_checkpoint` writes ledger history rows only when enabled.
3262    #[tokio::test]
3263    async fn index_checkpoint_gates_on_ledger_history_enabled() {
3264        use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3265
3266        let temp_dir = tempfile::tempdir().unwrap();
3267        let db_path = temp_dir.path().join("rpc-index");
3268        let tables =
3269            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3270
3271        // Non-zero cp seq to skip the genesis path in `index_epoch` (which
3272        // expects a real system state object).
3273        let checkpoint = TestCheckpointBuilder::new(1)
3274            .start_transaction(1)
3275            .finish_transaction()
3276            .build_checkpoint();
3277
3278        // Disabled: no ledger history writes.
3279        let batch = tables
3280            .index_checkpoint(&checkpoint, /*ledger_history_enabled=*/ false)
3281            .expect("index_checkpoint failed");
3282        batch.write().expect("batch write failed");
3283        assert_eq!(tables.tx_seq_digest.safe_iter().count(), 0);
3284        assert_eq!(tables.transaction_bitmap.safe_iter().count(), 0);
3285        assert_eq!(tables.event_bitmap.safe_iter().count(), 0);
3286
3287        // Enabled → forward writes land.
3288        let checkpoint2 = TestCheckpointBuilder::new(2)
3289            .start_transaction(1)
3290            .finish_transaction()
3291            .build_checkpoint();
3292        let batch = tables
3293            .index_checkpoint(&checkpoint2, /*ledger_history_enabled=*/ true)
3294            .expect("index_checkpoint failed");
3295        batch.write().expect("batch write failed");
3296        assert!(
3297            tables.tx_seq_digest.safe_iter().count() > 0,
3298            "tx_seq_digest must have rows when ledger_history_enabled=true"
3299        );
3300    }
3301
3302    /// `tx_offset` records each transaction's zero-based position *within its
3303    /// checkpoint*, not its global `tx_sequence_number`, and restarts at 0 on
3304    /// every checkpoint boundary.
3305    #[tokio::test]
3306    async fn index_checkpoint_records_within_checkpoint_tx_offset() {
3307        use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3308
3309        let temp_dir = tempfile::tempdir().unwrap();
3310        let db_path = temp_dir.path().join("rpc-index");
3311        let tables =
3312            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3313
3314        // Checkpoint 1: three txs over a non-zero tx-seq base (100), so the
3315        // global tx_seqs are 100,101,102 — distinct from the offsets 0,1,2.
3316        let checkpoint1 = TestCheckpointBuilder::new(1)
3317            .with_network_total_transactions(100)
3318            .start_transaction(0)
3319            .finish_transaction()
3320            .start_transaction(1)
3321            .finish_transaction()
3322            .start_transaction(2)
3323            .finish_transaction()
3324            .build_checkpoint();
3325        tables
3326            .index_checkpoint(&checkpoint1, true)
3327            .expect("index_checkpoint failed")
3328            .write()
3329            .expect("batch write failed");
3330
3331        // Checkpoint 2: two more txs; global tx_seqs continue at 103,104 but the
3332        // offsets restart at 0.
3333        let checkpoint2 = TestCheckpointBuilder::new(2)
3334            .with_network_total_transactions(103)
3335            .start_transaction(0)
3336            .finish_transaction()
3337            .start_transaction(1)
3338            .finish_transaction()
3339            .build_checkpoint();
3340        tables
3341            .index_checkpoint(&checkpoint2, true)
3342            .expect("index_checkpoint failed")
3343            .write()
3344            .expect("batch write failed");
3345
3346        let offset_by_tx_seq: std::collections::BTreeMap<u64, u32> = tables
3347            .tx_seq_digest
3348            .safe_iter()
3349            .map(|row| row.map(|(tx_seq, info)| (tx_seq, info.tx_offset)))
3350            .collect::<Result<_, _>>()
3351            .unwrap();
3352
3353        assert_eq!(
3354            offset_by_tx_seq,
3355            std::collections::BTreeMap::from([(100, 0), (101, 1), (102, 2), (103, 0), (104, 1),]),
3356            "tx_offset must be the within-checkpoint position, not the global tx_seq"
3357        );
3358    }
3359
3360    /// `prune()` commits the tx_seq_digest range delete and the
3361    /// `Watermark::Pruned` advance in one atomic batch. After a successful
3362    /// prune both halves are present: no tx_seq_digest row remains in the
3363    /// deleted range, AND `Watermark::Pruned` is at the new value.
3364    #[tokio::test]
3365    async fn prune_commits_deletes_and_watermark_atomically() {
3366        let temp_dir = tempfile::tempdir().unwrap();
3367        let db_path = temp_dir.path().join("rpc-index");
3368        let tables =
3369            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3370
3371        let mut batch = tables.tx_seq_digest.batch();
3372        for tx_seq in 0..4u64 {
3373            batch
3374                .insert_batch(
3375                    &tables.tx_seq_digest,
3376                    [(
3377                        tx_seq,
3378                        TxSeqDigestInfo {
3379                            digest: TransactionDigest::new([0; 32]),
3380                            event_count: 0,
3381                            tx_offset: 0,
3382                            checkpoint_number: 0,
3383                        },
3384                    )],
3385                )
3386                .unwrap();
3387        }
3388        batch.write().unwrap();
3389
3390        let pruning_atomic = AtomicU64::new(0);
3391        tables
3392            .prune(1, 2, /*ledger_history_enabled=*/ true, &pruning_atomic)
3393            .unwrap();
3394
3395        // After the single atomic batch lands, both halves are present: the
3396        // deleted rows AND the advanced `Watermark::Pruned`.
3397        assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3398        for tx_seq in 0..2u64 {
3399            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3400        }
3401        for tx_seq in 2..4u64 {
3402            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3403        }
3404        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(2));
3405        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 2);
3406    }
3407
3408    /// Replaying a prune with an unchanged floor is a no-op: the second call
3409    /// does not advance the compaction-filter atomic and leaves rows +
3410    /// watermark untouched. Covers crash-replay where the pruner re-issues the
3411    /// same prune.
3412    #[tokio::test]
3413    async fn prune_idempotent_replay() {
3414        let temp_dir = tempfile::tempdir().unwrap();
3415        let db_path = temp_dir.path().join("rpc-index");
3416        let tables =
3417            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3418
3419        let mut batch = tables.tx_seq_digest.batch();
3420        for tx_seq in 0..5u64 {
3421            batch
3422                .insert_batch(
3423                    &tables.tx_seq_digest,
3424                    [(
3425                        tx_seq,
3426                        TxSeqDigestInfo {
3427                            digest: TransactionDigest::new([0; 32]),
3428                            event_count: 0,
3429                            tx_offset: 0,
3430                            checkpoint_number: 0,
3431                        },
3432                    )],
3433                )
3434                .unwrap();
3435        }
3436        batch.write().unwrap();
3437
3438        let pruning_atomic = AtomicU64::new(0);
3439        tables.prune(1, 3, true, &pruning_atomic).unwrap();
3440        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3441        // Same floor again: nothing new to delete, floor already at 3.
3442        tables.prune(1, 3, true, &pruning_atomic).unwrap();
3443        assert_eq!(
3444            pruning_atomic.load(Ordering::Relaxed),
3445            3,
3446            "replay must not move the atomic"
3447        );
3448        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3449        for tx_seq in 3..5u64 {
3450            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3451        }
3452    }
3453
3454    /// Consecutive prunes advance the floor across an existing range tombstone:
3455    /// the second prune derives `prev_exclusive` from `first_tx_seq_digest_key`,
3456    /// which must read *through* the first tombstone (only possible because the
3457    /// CF is opened with `ignore_range_deletions = false`).
3458    #[tokio::test]
3459    async fn prune_consecutive_ranges_advance_floor() {
3460        let temp_dir = tempfile::tempdir().unwrap();
3461        let db_path = temp_dir.path().join("rpc-index");
3462        let tables =
3463            IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3464
3465        let mut batch = tables.tx_seq_digest.batch();
3466        for tx_seq in 0..6u64 {
3467            batch
3468                .insert_batch(
3469                    &tables.tx_seq_digest,
3470                    [(
3471                        tx_seq,
3472                        TxSeqDigestInfo {
3473                            digest: TransactionDigest::new([0; 32]),
3474                            event_count: 0,
3475                            tx_offset: 0,
3476                            checkpoint_number: 0,
3477                        },
3478                    )],
3479                )
3480                .unwrap();
3481        }
3482        batch.write().unwrap();
3483
3484        let pruning_atomic = AtomicU64::new(0);
3485        tables.prune(1, 3, true, &pruning_atomic).unwrap();
3486        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3487        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3488        // Second prune must see floor 3 (through the tombstone) and extend it to 5.
3489        tables.prune(2, 5, true, &pruning_atomic).unwrap();
3490        assert_eq!(pruning_atomic.load(Ordering::Relaxed), 5);
3491        for tx_seq in 0..5u64 {
3492            assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3493        }
3494        assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(5));
3495    }
3496
3497    /// `new_without_init` must honor an already-built ledger history DB.
3498    #[tokio::test]
3499    async fn new_without_init_enables_ledger_history_for_db_with_ledger_history_setting() {
3500        let temp_dir = tempfile::tempdir().unwrap();
3501        let db_path = temp_dir.path().join("rpc-index");
3502
3503        // Seed a DB built with ledger history indexing.
3504        {
3505            let tables =
3506                IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3507            tables
3508                .meta
3509                .insert(
3510                    &(),
3511                    &MetadataInfo {
3512                        version: CURRENT_DB_VERSION,
3513                    },
3514                )
3515                .unwrap();
3516            tables
3517                .settings
3518                .insert(
3519                    &(),
3520                    &encode_settings(&IndexSettings {
3521                        ledger_history_indexing: true,
3522                    }),
3523                )
3524                .unwrap();
3525            let mut batch = tables.tx_seq_digest.batch();
3526            for tx_seq in 100..105u64 {
3527                batch
3528                    .insert_batch(
3529                        &tables.tx_seq_digest,
3530                        [(
3531                            tx_seq,
3532                            TxSeqDigestInfo {
3533                                digest: TransactionDigest::new([0; 32]),
3534                                event_count: 0,
3535                                tx_offset: 0,
3536                                checkpoint_number: 0,
3537                            },
3538                        )],
3539                    )
3540                    .unwrap();
3541            }
3542            batch.write().unwrap();
3543        }
3544
3545        let store = RpcIndexStore::new_without_init(temp_dir.path());
3546        assert!(
3547            store.ledger_history_enabled,
3548            "new_without_init on a ledger-history DB must enable ledger history indexing"
3549        );
3550        let atomic = &store.ledger_history_pruning_watermark;
3551        assert_eq!(
3552            atomic.load(Ordering::Relaxed),
3553            100,
3554            "pruning atomic must be hydrated from the first tx_seq_digest key"
3555        );
3556
3557        // Pruning through tx-seq floor 103 deletes rows [100, 103).
3558        store.prune(7, 103).unwrap();
3559
3560        for tx_seq in 100..103u64 {
3561            assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3562        }
3563        for tx_seq in 103..105u64 {
3564            assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3565        }
3566        assert_eq!(
3567            store.tables.first_tx_seq_digest_key().unwrap(),
3568            Some(103),
3569            "prune must advance the derived tx-seq floor"
3570        );
3571        assert_eq!(
3572            atomic.load(Ordering::Relaxed),
3573            103,
3574            "prune must advance the compaction-filter atomic"
3575        );
3576    }
3577
3578    /// A DB without the ledger-history setting stays disabled in `new_without_init`.
3579    #[tokio::test]
3580    async fn new_without_init_disables_ledger_history_for_db_without_ledger_history_setting() {
3581        // Case 1: fresh/empty DB.
3582        let temp_dir = tempfile::tempdir().unwrap();
3583        let store = RpcIndexStore::new_without_init(temp_dir.path());
3584        assert!(
3585            !store.ledger_history_enabled,
3586            "new_without_init on a fresh DB must leave ledger history indexing disabled"
3587        );
3588
3589        store.prune(5, 0).unwrap();
3590        assert_eq!(
3591            store.tables.watermark.get(&Watermark::Pruned).unwrap(),
3592            Some(5)
3593        );
3594        assert_eq!(
3595            store.tables.first_tx_seq_digest_key().unwrap(),
3596            None,
3597            "disabled ledger history indexing must leave tx_seq_digest untouched"
3598        );
3599
3600        // Case 2: a current-schema DB with no `settings` row (e.g. a pre-feature
3601        // DB). The missing row must read as ledger history disabled.
3602        let temp_dir = tempfile::tempdir().unwrap();
3603        let db_path = temp_dir.path().join("rpc-index");
3604        {
3605            let tables =
3606                IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3607            tables
3608                .meta
3609                .insert(
3610                    &(),
3611                    &MetadataInfo {
3612                        version: CURRENT_DB_VERSION,
3613                    },
3614                )
3615                .unwrap();
3616        }
3617        let store = RpcIndexStore::new_without_init(temp_dir.path());
3618        assert!(
3619            !store.ledger_history_enabled,
3620            "new_without_init on a DB with no settings row must leave ledger history indexing disabled"
3621        );
3622    }
3623
3624    /// Parse the newest `OPTIONS-NNNNNN` file in `db_path` into a map keyed by
3625    /// column family name. RocksDB writes one such file on every open with a
3626    /// section per CF in INI-like format.
3627    fn parse_cf_options(db_path: &Path) -> HashMap<String, HashMap<String, String>> {
3628        let mut options_file: Option<(u64, PathBuf)> = None;
3629        for entry in std::fs::read_dir(db_path).expect("read_dir failed") {
3630            let entry = entry.unwrap();
3631            let name = entry.file_name().to_string_lossy().into_owned();
3632            let Some(rest) = name.strip_prefix("OPTIONS-") else {
3633                continue;
3634            };
3635            // Skip transient files like `OPTIONS-NNNNNN.dbtmp`.
3636            let Ok(seq) = rest.parse::<u64>() else {
3637                continue;
3638            };
3639            if options_file.as_ref().is_none_or(|(s, _)| seq > *s) {
3640                options_file = Some((seq, entry.path()));
3641            }
3642        }
3643        let (_, path) = options_file.expect("no OPTIONS-* file written");
3644        let content = std::fs::read_to_string(&path).expect("read OPTIONS failed");
3645
3646        let mut result: HashMap<String, HashMap<String, String>> = HashMap::new();
3647        let mut current_cf: Option<String> = None;
3648        for line in content.lines() {
3649            let line = line.trim();
3650            if let Some(rest) = line.strip_prefix("[CFOptions \"") {
3651                let cf_name = rest.trim_end_matches("\"]").to_string();
3652                current_cf = Some(cf_name);
3653            } else if line.starts_with('[') {
3654                // Any other section ends the CFOptions block.
3655                current_cf = None;
3656            } else if let Some(cf) = current_cf.as_ref()
3657                && let Some((k, v)) = line.split_once('=')
3658            {
3659                result
3660                    .entry(cf.clone())
3661                    .or_default()
3662                    .insert(k.trim().to_string(), v.trim().to_string());
3663            }
3664        }
3665        result
3666    }
3667}