sui_core/
jsonrpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! IndexStore supports creation of various ancillary indexes of state in SuiDataStore.
5//! The main user of this data is the explorer.
6
7use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18    IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19    register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use sui_types::accumulator_event::AccumulatorEvent;
23use typed_store::TypedStoreError;
24use typed_store::rocksdb::compaction_filter::Decision;
25
26use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
27use sui_storage::mutex_table::MutexTable;
28use sui_storage::sharded_lru::ShardedLruCache;
29use sui_types::base_types::{
30    ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
31};
32use sui_types::base_types::{ObjectInfo, ObjectRef};
33use sui_types::digests::TransactionEventsDigest;
34use sui_types::dynamic_field::{self, DynamicFieldInfo};
35use sui_types::effects::TransactionEvents;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
37use sui_types::inner_temporary_store::TxCoins;
38use sui_types::object::{Object, Owner};
39use sui_types::parse_sui_struct_tag;
40use sui_types::storage::error::Error as StorageError;
41use tracing::{debug, info, instrument, trace};
42use typed_store::DBMapUtils;
43use typed_store::rocks::{
44    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, StagedBatch, default_db_options,
45    read_size_from_env,
46};
47use typed_store::traits::Map;
48
49type OwnerIndexKey = (SuiAddress, ObjectID);
50type DynamicFieldKey = (ObjectID, ObjectID);
51type EventId = (TxSequenceNumber, usize);
52type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
53type AllBalance = HashMap<TypeTag, TotalBalance>;
54
55#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
56pub struct CoinIndexKey2 {
57    pub owner: SuiAddress,
58    pub coin_type: String,
59    // the balance of the coin inverted `!coin.balance` in order to force sorting of coins to be
60    // from greatest to least
61    pub inverted_balance: u64,
62    pub object_id: ObjectID,
63}
64
65impl CoinIndexKey2 {
66    pub fn new_from_cursor(
67        owner: SuiAddress,
68        coin_type: String,
69        inverted_balance: u64,
70        object_id: ObjectID,
71    ) -> Self {
72        Self {
73            owner,
74            coin_type,
75            inverted_balance,
76            object_id,
77        }
78    }
79
80    pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
81        Self {
82            owner,
83            coin_type,
84            inverted_balance: !balance,
85            object_id,
86        }
87    }
88}
89
90const CURRENT_DB_VERSION: u64 = 0;
91const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
92
93#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
94struct MetadataInfo {
95    /// Version of the Database
96    version: u64,
97    /// Version of each of the column families
98    ///
99    /// This is used to version individual column families to determine if a CF needs to be
100    /// (re)initialized on startup.
101    column_families: BTreeMap<String, ColumnFamilyInfo>,
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
105struct ColumnFamilyInfo {
106    version: u64,
107}
108
109pub const MAX_TX_RANGE_SIZE: u64 = 4096;
110
111pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
112const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
113const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
114const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
115
116#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
117pub struct TotalBalance {
118    pub balance: i128,
119    pub num_coins: i64,
120    pub address_balance: u64,
121}
122
123#[derive(Debug)]
124pub struct ObjectIndexChanges {
125    pub deleted_owners: Vec<OwnerIndexKey>,
126    pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
127    pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
128    pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
129}
130
131#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
132pub struct CoinInfo {
133    pub version: SequenceNumber,
134    pub digest: ObjectDigest,
135    pub balance: u64,
136    pub previous_transaction: TransactionDigest,
137}
138
139impl CoinInfo {
140    pub fn from_object(object: &Object) -> Option<CoinInfo> {
141        object.as_coin_maybe().map(|coin| CoinInfo {
142            version: object.version(),
143            digest: object.digest(),
144            previous_transaction: object.previous_transaction,
145            balance: coin.value(),
146        })
147    }
148}
149
150pub struct IndexStoreMetrics {
151    balance_lookup_from_db: IntCounter,
152    balance_lookup_from_total: IntCounter,
153    all_balance_lookup_from_db: IntCounter,
154    all_balance_lookup_from_total: IntCounter,
155}
156
157impl IndexStoreMetrics {
158    pub fn new(registry: &Registry) -> IndexStoreMetrics {
159        Self {
160            balance_lookup_from_db: register_int_counter_with_registry!(
161                "balance_lookup_from_db",
162                "Total number of balance requests served from cache",
163                registry,
164            )
165            .unwrap(),
166            balance_lookup_from_total: register_int_counter_with_registry!(
167                "balance_lookup_from_total",
168                "Total number of balance requests served ",
169                registry,
170            )
171            .unwrap(),
172            all_balance_lookup_from_db: register_int_counter_with_registry!(
173                "all_balance_lookup_from_db",
174                "Total number of all balance requests served from cache",
175                registry,
176            )
177            .unwrap(),
178            all_balance_lookup_from_total: register_int_counter_with_registry!(
179                "all_balance_lookup_from_total",
180                "Total number of all balance requests served",
181                registry,
182            )
183            .unwrap(),
184        }
185    }
186}
187
188pub struct IndexStoreCaches {
189    per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
190    all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
191    pub locks: MutexTable<SuiAddress>,
192}
193
194type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
195
196/// Cache updates with optional locks held. Returned from `index_tx`.
197/// In sync mode, locks are acquired and held until the batch is committed.
198/// In async mode, locks are None and this is converted to `IndexStoreCacheUpdates`
199/// via `into_inner()` before sending across threads.
200pub struct IndexStoreCacheUpdatesWithLocks {
201    pub(crate) _locks: Option<Vec<OwnedMutexGuard<()>>>,
202    pub(crate) inner: IndexStoreCacheUpdates,
203}
204
205impl IndexStoreCacheUpdatesWithLocks {
206    pub fn into_inner(self) -> IndexStoreCacheUpdates {
207        self.inner
208    }
209}
210
211/// Send-safe cache updates without locks, used in the async post-processing channel
212/// and by `commit_index_batch`.
213#[derive(Default)]
214pub struct IndexStoreCacheUpdates {
215    per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
216    all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
217}
218
219#[derive(DBMapUtils)]
220pub struct IndexStoreTables {
221    /// A singleton that store metadata information on the DB.
222    ///
223    /// A few uses for this singleton:
224    /// - determining if the DB has been initialized (as some tables could still be empty post
225    ///   initialization)
226    /// - version of each column family and their respective initialization status
227    meta: DBMap<(), MetadataInfo>,
228
229    /// Index from sui address to transactions initiated by that address.
230    transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
231
232    /// Index from sui address to transactions that were sent to that address.
233    transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
234
235    /// Index from object id to transactions that used that object id as input.
236    #[deprecated]
237    transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
238
239    /// Index from object id to transactions that modified/created that object id.
240    #[deprecated]
241    transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
242
243    /// Index from package id, module and function identifier to transactions that used that moce function call as input.
244    transactions_by_move_function:
245        DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
246
247    /// Ordering of all indexed transactions.
248    transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
249
250    /// Index from transaction digest to sequence number.
251    transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
252
253    /// This is an index of object references to currently existing objects, indexed by the
254    /// composite key of the SuiAddress of their owner and the object ID of the object.
255    /// This composite index allows an efficient iterator to list all objected currently owned
256    /// by a specific user, and their object reference.
257    owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
258
259    coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
260    // Simple index that just tracks the existance of an address balance for an address.
261    address_balances: DBMap<(SuiAddress, TypeTag), ()>,
262
263    /// This is an index of object references to currently existing dynamic field object, indexed by the
264    /// composite key of the object ID of their parent and the object ID of the dynamic field object.
265    /// This composite index allows an efficient iterator to list all objects currently owned
266    /// by a specific object, and their object reference.
267    dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
268
269    event_order: DBMap<EventId, EventIndex>,
270    event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
271    event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
272    event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
273    event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
274    event_by_time: DBMap<(u64, EventId), EventIndex>,
275
276    pruner_watermark: DBMap<(), TxSequenceNumber>,
277}
278
279impl IndexStoreTables {
280    pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
281        &self.owner_index
282    }
283
284    pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
285        &self.coin_index_2
286    }
287
288    pub fn check_databases_equal(&self, other: &IndexStoreTables) {
289        fn assert_tables_equal<K, V>(name: &str, table_a: &DBMap<K, V>, table_b: &DBMap<K, V>)
290        where
291            K: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
292            V: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
293        {
294            let mut iter_a = table_a.safe_iter();
295            let mut iter_b = table_b.safe_iter();
296            let mut count = 0u64;
297            loop {
298                match (iter_a.next(), iter_b.next()) {
299                    (Some(a), Some(b)) => {
300                        let (ka, va): (K, V) = a.expect("failed to read from table_a");
301                        let (kb, vb): (K, V) = b.expect("failed to read from table_b");
302                        assert!(
303                            ka == kb && va == vb,
304                            "{name}: mismatch at entry {count}:\n  a=({ka:?}, {va:?})\n  b=({kb:?}, {vb:?})"
305                        );
306                        count += 1;
307                    }
308                    (None, None) => break,
309                    (Some(_), None) => {
310                        panic!(
311                            "{name}: table_a has more entries than table_b (diverged after {count} entries)"
312                        );
313                    }
314                    (None, Some(_)) => {
315                        panic!(
316                            "{name}: table_b has more entries than table_a (diverged after {count} entries)"
317                        );
318                    }
319                }
320            }
321            info!("{name}: verified {count} entries are identical");
322        }
323
324        // Tables keyed by TxSequenceNumber may have different sequence numbers between
325        // async and sync post-processing. Compare the (prefix, value) pairs as sorted
326        // vectors instead of lockstep iteration.
327        // Crash recovery can re-index transactions with new sequence numbers while
328        // old entries remain, producing duplicates. Deduplicate before comparing.
329        fn assert_seq_table_equal<P, V>(
330            name: &str,
331            table_a: &DBMap<(P, TxSequenceNumber), V>,
332            table_b: &DBMap<(P, TxSequenceNumber), V>,
333        ) where
334            P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
335            V: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
336        {
337            let mut entries_a: Vec<(P, V)> = table_a
338                .safe_iter()
339                .map(|r| {
340                    let ((p, _seq), v) = r.expect("failed to read from table_a");
341                    (p, v)
342                })
343                .collect();
344            let mut entries_b: Vec<(P, V)> = table_b
345                .safe_iter()
346                .map(|r| {
347                    let ((p, _seq), v) = r.expect("failed to read from table_b");
348                    (p, v)
349                })
350                .collect();
351            entries_a.sort();
352            entries_a.dedup();
353            entries_b.sort();
354            entries_b.dedup();
355            assert!(
356                entries_a.len() == entries_b.len(),
357                "{name}: different number of unique entries: {} vs {}",
358                entries_a.len(),
359                entries_b.len()
360            );
361            for (i, (a, b)) in entries_a.iter().zip(entries_b.iter()).enumerate() {
362                assert!(
363                    a == b,
364                    "{name}: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
365                );
366            }
367            info!(
368                "{name}: verified {} unique entries match (ignoring sequence numbers)",
369                entries_a.len()
370            );
371        }
372
373        // EventIndex contains a wall-clock timestamp that differs between nodes.
374        // Compare only (TransactionEventsDigest, TransactionDigest).
375        type EventKey = (TransactionEventsDigest, TransactionDigest);
376        fn strip_timestamp(ei: EventIndex) -> EventKey {
377            (ei.0, ei.1)
378        }
379
380        // Tables where the key contains an EventId = (TxSequenceNumber, usize). Compare
381        // the (prefix, event_key) pairs as sorted vectors, ignoring both event ids and
382        // timestamps.
383        fn assert_event_table_equal<P>(
384            name: &str,
385            table_a: &DBMap<(P, EventId), EventIndex>,
386            table_b: &DBMap<(P, EventId), EventIndex>,
387        ) where
388            P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
389        {
390            let mut entries_a: Vec<(P, EventKey)> = table_a
391                .safe_iter()
392                .map(|r| {
393                    let ((p, _eid), v) = r.expect("failed to read from table_a");
394                    (p, strip_timestamp(v))
395                })
396                .collect();
397            let mut entries_b: Vec<(P, EventKey)> = table_b
398                .safe_iter()
399                .map(|r| {
400                    let ((p, _eid), v) = r.expect("failed to read from table_b");
401                    (p, strip_timestamp(v))
402                })
403                .collect();
404            entries_a.sort();
405            entries_a.dedup();
406            entries_b.sort();
407            entries_b.dedup();
408            assert!(
409                entries_a.len() == entries_b.len(),
410                "{name}: different number of unique entries: {} vs {}",
411                entries_a.len(),
412                entries_b.len()
413            );
414            for (i, (a, b)) in entries_a.iter().zip(entries_b.iter()).enumerate() {
415                assert!(
416                    a == b,
417                    "{name}: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
418                );
419            }
420            info!(
421                "{name}: verified {} unique entries match (ignoring event ids and timestamps)",
422                entries_a.len()
423            );
424        }
425
426        // Exact comparison — tables with no sequence numbers in keys.
427        assert_tables_equal("owner_index", &self.owner_index, &other.owner_index);
428        assert_tables_equal("coin_index_2", &self.coin_index_2, &other.coin_index_2);
429        assert_tables_equal(
430            "address_balances",
431            &self.address_balances,
432            &other.address_balances,
433        );
434        assert_tables_equal(
435            "dynamic_field_index",
436            &self.dynamic_field_index,
437            &other.dynamic_field_index,
438        );
439
440        // Transaction tables — compare (address, digest) pairs ignoring sequence numbers.
441        assert_seq_table_equal(
442            "transactions_from_addr",
443            &self.transactions_from_addr,
444            &other.transactions_from_addr,
445        );
446        assert_seq_table_equal(
447            "transactions_to_addr",
448            &self.transactions_to_addr,
449            &other.transactions_to_addr,
450        );
451        // transactions_by_move_function has a 4-tuple key: (ObjectID, String, String, TxSequenceNumber)
452        {
453            let mut entries_a: Vec<((ObjectID, String, String), TransactionDigest)> = self
454                .transactions_by_move_function
455                .safe_iter()
456                .map(|r| {
457                    let ((pkg, module, func, _seq), digest) =
458                        r.expect("failed to read from table_a");
459                    ((pkg, module, func), digest)
460                })
461                .collect();
462            let mut entries_b: Vec<((ObjectID, String, String), TransactionDigest)> = other
463                .transactions_by_move_function
464                .safe_iter()
465                .map(|r| {
466                    let ((pkg, module, func, _seq), digest) =
467                        r.expect("failed to read from table_b");
468                    ((pkg, module, func), digest)
469                })
470                .collect();
471            entries_a.sort();
472            entries_a.dedup();
473            entries_b.sort();
474            entries_b.dedup();
475            assert!(
476                entries_a.len() == entries_b.len(),
477                "transactions_by_move_function: different number of unique entries: {} vs {}",
478                entries_a.len(),
479                entries_b.len()
480            );
481            for (i, (a, b)) in entries_a.iter().zip(entries_b.iter()).enumerate() {
482                assert!(
483                    a == b,
484                    "transactions_by_move_function: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
485                );
486            }
487            info!(
488                "transactions_by_move_function: verified {} entries match (ignoring sequence numbers)",
489                entries_a.len()
490            );
491        }
492
493        // Event tables — compare event keys ignoring event ids and timestamps.
494        {
495            // event_order: DBMap<EventId, EventIndex> — no prefix, just compare values.
496            let mut vals_a: Vec<EventKey> = self
497                .event_order
498                .safe_iter()
499                .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
500                .collect();
501            let mut vals_b: Vec<EventKey> = other
502                .event_order
503                .safe_iter()
504                .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
505                .collect();
506            vals_a.sort();
507            vals_a.dedup();
508            vals_b.sort();
509            vals_b.dedup();
510            assert!(
511                vals_a.len() == vals_b.len(),
512                "event_order: different number of entries: {} vs {}",
513                vals_a.len(),
514                vals_b.len()
515            );
516            for (i, (a, b)) in vals_a.iter().zip(vals_b.iter()).enumerate() {
517                assert!(
518                    a == b,
519                    "event_order: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
520                );
521            }
522            info!(
523                "event_order: verified {} entries match (ignoring event ids and timestamps)",
524                vals_a.len()
525            );
526        }
527        assert_event_table_equal(
528            "event_by_move_module",
529            &self.event_by_move_module,
530            &other.event_by_move_module,
531        );
532        assert_event_table_equal(
533            "event_by_move_event",
534            &self.event_by_move_event,
535            &other.event_by_move_event,
536        );
537        assert_event_table_equal(
538            "event_by_event_module",
539            &self.event_by_event_module,
540            &other.event_by_event_module,
541        );
542        assert_event_table_equal(
543            "event_by_sender",
544            &self.event_by_sender,
545            &other.event_by_sender,
546        );
547        // event_by_time: key is (timestamp_ms, EventId) — timestamps differ between
548        // nodes, so just compare the set of EventKey values.
549        {
550            let mut vals_a: Vec<EventKey> = self
551                .event_by_time
552                .safe_iter()
553                .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
554                .collect();
555            let mut vals_b: Vec<EventKey> = other
556                .event_by_time
557                .safe_iter()
558                .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
559                .collect();
560            vals_a.sort();
561            vals_a.dedup();
562            vals_b.sort();
563            vals_b.dedup();
564            assert!(
565                vals_a.len() == vals_b.len(),
566                "event_by_time: different number of entries: {} vs {}",
567                vals_a.len(),
568                vals_b.len()
569            );
570            for (i, (a, b)) in vals_a.iter().zip(vals_b.iter()).enumerate() {
571                assert!(
572                    a == b,
573                    "event_by_time: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
574                );
575            }
576            info!(
577                "event_by_time: verified {} entries match (ignoring timestamps and event ids)",
578                vals_a.len()
579            );
580        }
581
582        // Skipped tables:
583        // - transaction_order / transactions_seq: sequence numbers differ by design
584        // - transactions_by_input_object_id / transactions_by_mutated_object_id: deprecated
585        // - meta: metadata singleton, not meaningful to compare
586        // - pruner_watermark: operational state
587    }
588
589    #[allow(deprecated)]
590    fn init(&mut self) -> Result<(), StorageError> {
591        let metadata = {
592            match self.meta.get(&()) {
593                Ok(Some(metadata)) => metadata,
594                Ok(None) | Err(_) => MetadataInfo {
595                    version: CURRENT_DB_VERSION,
596                    column_families: BTreeMap::new(),
597                },
598            }
599        };
600
601        // Commit to the DB that the indexes have been initialized
602        self.meta.insert(&(), &metadata)?;
603
604        Ok(())
605    }
606
607    pub fn get_dynamic_fields_iterator(
608        &self,
609        object: ObjectID,
610        cursor: Option<ObjectID>,
611    ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
612        debug!(?object, "get_dynamic_fields");
613        // The object id 0 is the smallest possible
614        let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
615        let iter_upper_bound = (object, ObjectID::MAX);
616        self.dynamic_field_index
617            .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
618            // skip an extra b/c the cursor is exclusive
619            .skip(usize::from(cursor.is_some()))
620            .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
621            .map_ok(|((_, c), object_info)| (c, object_info))
622    }
623}
624
625pub struct IndexStore {
626    next_sequence_number: AtomicU64,
627    tables: IndexStoreTables,
628    pub caches: IndexStoreCaches,
629    metrics: Arc<IndexStoreMetrics>,
630    max_type_length: u64,
631    remove_deprecated_tables: bool,
632    pruner_watermark: Arc<AtomicU64>,
633}
634
635struct JsonRpcCompactionMetrics {
636    key_removed: IntCounterVec,
637    key_kept: IntCounterVec,
638    key_error: IntCounterVec,
639}
640
641impl JsonRpcCompactionMetrics {
642    pub fn new(registry: &Registry) -> Arc<Self> {
643        Arc::new(Self {
644            key_removed: register_int_counter_vec_with_registry!(
645                "json_rpc_compaction_filter_key_removed",
646                "Compaction key removed",
647                &["cf"],
648                registry
649            )
650            .unwrap(),
651            key_kept: register_int_counter_vec_with_registry!(
652                "json_rpc_compaction_filter_key_kept",
653                "Compaction key kept",
654                &["cf"],
655                registry
656            )
657            .unwrap(),
658            key_error: register_int_counter_vec_with_registry!(
659                "json_rpc_compaction_filter_key_error",
660                "Compaction error",
661                &["cf"],
662                registry
663            )
664            .unwrap(),
665        })
666    }
667}
668
669fn compaction_filter_config<T: DeserializeOwned>(
670    name: &str,
671    metrics: Arc<JsonRpcCompactionMetrics>,
672    mut db_options: DBOptions,
673    pruner_watermark: Arc<AtomicU64>,
674    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
675    by_key: bool,
676) -> DBOptions {
677    let cf = name.to_string();
678    db_options
679        .options
680        .set_compaction_filter(name, move |_, key, value| {
681            let bytes = if by_key { key } else { value };
682            let deserializer = bincode::DefaultOptions::new()
683                .with_big_endian()
684                .with_fixint_encoding();
685            match deserializer.deserialize(bytes) {
686                Ok(key_data) => {
687                    let sequence_number = extractor(key_data);
688                    if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
689                        metrics.key_removed.with_label_values(&[&cf]).inc();
690                        Decision::Remove
691                    } else {
692                        metrics.key_kept.with_label_values(&[&cf]).inc();
693                        Decision::Keep
694                    }
695                }
696                Err(_) => {
697                    metrics.key_error.with_label_values(&[&cf]).inc();
698                    Decision::Keep
699                }
700            }
701        });
702    db_options
703}
704
705fn compaction_filter_config_by_key<T: DeserializeOwned>(
706    name: &str,
707    metrics: Arc<JsonRpcCompactionMetrics>,
708    db_options: DBOptions,
709    pruner_watermark: Arc<AtomicU64>,
710    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
711) -> DBOptions {
712    compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
713}
714
715fn coin_index_table_default_config() -> DBOptions {
716    default_db_options()
717        .optimize_for_write_throughput()
718        .optimize_for_read(
719            read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
720        )
721        .disable_write_throttling()
722}
723
724impl IndexStore {
725    pub fn new_without_init(
726        path: PathBuf,
727        registry: &Registry,
728        max_type_length: Option<u64>,
729        remove_deprecated_tables: bool,
730    ) -> Self {
731        let db_options = default_db_options().disable_write_throttling();
732        let pruner_watermark = Arc::new(AtomicU64::new(0));
733        let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
734        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
735            (
736                "transactions_from_addr".to_string(),
737                compaction_filter_config_by_key(
738                    "transactions_from_addr",
739                    compaction_metrics.clone(),
740                    db_options.clone(),
741                    pruner_watermark.clone(),
742                    |(_, id): (SuiAddress, TxSequenceNumber)| id,
743                ),
744            ),
745            (
746                "transactions_to_addr".to_string(),
747                compaction_filter_config_by_key(
748                    "transactions_to_addr",
749                    compaction_metrics.clone(),
750                    db_options.clone(),
751                    pruner_watermark.clone(),
752                    |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
753                ),
754            ),
755            (
756                "transactions_by_move_function".to_string(),
757                compaction_filter_config_by_key(
758                    "transactions_by_move_function",
759                    compaction_metrics.clone(),
760                    db_options.clone(),
761                    pruner_watermark.clone(),
762                    |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
763                ),
764            ),
765            (
766                "transaction_order".to_string(),
767                compaction_filter_config_by_key(
768                    "transaction_order",
769                    compaction_metrics.clone(),
770                    db_options.clone(),
771                    pruner_watermark.clone(),
772                    |sequence_number: TxSequenceNumber| sequence_number,
773                ),
774            ),
775            (
776                "transactions_seq".to_string(),
777                compaction_filter_config(
778                    "transactions_seq",
779                    compaction_metrics.clone(),
780                    db_options.clone(),
781                    pruner_watermark.clone(),
782                    |sequence_number: TxSequenceNumber| sequence_number,
783                    false,
784                ),
785            ),
786            (
787                "coin_index_2".to_string(),
788                coin_index_table_default_config(),
789            ),
790            (
791                "event_order".to_string(),
792                compaction_filter_config_by_key(
793                    "event_order",
794                    compaction_metrics.clone(),
795                    db_options.clone(),
796                    pruner_watermark.clone(),
797                    |event_id: EventId| event_id.0,
798                ),
799            ),
800            (
801                "event_by_move_module".to_string(),
802                compaction_filter_config_by_key(
803                    "event_by_move_module",
804                    compaction_metrics.clone(),
805                    db_options.clone(),
806                    pruner_watermark.clone(),
807                    |(_, event_id): (ModuleId, EventId)| event_id.0,
808                ),
809            ),
810            (
811                "event_by_event_module".to_string(),
812                compaction_filter_config_by_key(
813                    "event_by_event_module",
814                    compaction_metrics.clone(),
815                    db_options.clone(),
816                    pruner_watermark.clone(),
817                    |(_, event_id): (ModuleId, EventId)| event_id.0,
818                ),
819            ),
820            (
821                "event_by_sender".to_string(),
822                compaction_filter_config_by_key(
823                    "event_by_sender",
824                    compaction_metrics.clone(),
825                    db_options.clone(),
826                    pruner_watermark.clone(),
827                    |(_, event_id): (SuiAddress, EventId)| event_id.0,
828                ),
829            ),
830            (
831                "event_by_time".to_string(),
832                compaction_filter_config_by_key(
833                    "event_by_time",
834                    compaction_metrics.clone(),
835                    db_options.clone(),
836                    pruner_watermark.clone(),
837                    |(_, event_id): (u64, EventId)| event_id.0,
838                ),
839            ),
840        ]));
841        let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
842            path,
843            MetricConf::new("index"),
844            Some(db_options.options),
845            Some(table_options),
846            remove_deprecated_tables,
847        );
848
849        let metrics = IndexStoreMetrics::new(registry);
850        let caches = IndexStoreCaches {
851            per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
852            all_balances: ShardedLruCache::new(1_000_000, 1000),
853            locks: MutexTable::new(128),
854        };
855        let next_sequence_number = tables
856            .transaction_order
857            .reversed_safe_iter_with_bounds(None, None)
858            .expect("failed to initialize indexes")
859            .next()
860            .transpose()
861            .expect("failed to initialize indexes")
862            .map(|(seq, _)| seq + 1)
863            .unwrap_or(0)
864            .into();
865        let pruner_watermark_value = tables
866            .pruner_watermark
867            .get(&())
868            .expect("failed to initialize index tables")
869            .unwrap_or(0);
870        pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
871
872        Self {
873            tables,
874            next_sequence_number,
875            caches,
876            metrics: Arc::new(metrics),
877            max_type_length: max_type_length.unwrap_or(128),
878            remove_deprecated_tables,
879            pruner_watermark,
880        }
881    }
882
883    pub fn new(
884        path: PathBuf,
885        registry: &Registry,
886        max_type_length: Option<u64>,
887        remove_deprecated_tables: bool,
888    ) -> Self {
889        let mut store =
890            Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
891        store.tables.init().unwrap();
892        store
893    }
894
895    pub fn tables(&self) -> &IndexStoreTables {
896        &self.tables
897    }
898
899    #[instrument(skip_all)]
900    pub fn index_coin(
901        &self,
902        digest: &TransactionDigest,
903        batch: &mut StagedBatch,
904        object_index_changes: &ObjectIndexChanges,
905        tx_coins: Option<TxCoins>,
906        acquire_locks: bool,
907    ) -> SuiResult<IndexStoreCacheUpdatesWithLocks> {
908        // In production if this code path is hit, we should expect `tx_coins` to not be None.
909        // However, in many tests today we do not distinguish validator and/or fullnode, so
910        // we gracefully exist here.
911        if tx_coins.is_none() {
912            return Ok(IndexStoreCacheUpdatesWithLocks {
913                _locks: None,
914                inner: IndexStoreCacheUpdates::default(),
915            });
916        }
917
918        let _locks = if acquire_locks {
919            let mut addresses: HashSet<SuiAddress> = HashSet::new();
920            addresses.extend(
921                object_index_changes
922                    .deleted_owners
923                    .iter()
924                    .map(|(owner, _)| *owner),
925            );
926            addresses.extend(
927                object_index_changes
928                    .new_owners
929                    .iter()
930                    .map(|((owner, _), _)| *owner),
931            );
932            Some(self.caches.locks.acquire_locks(addresses.into_iter()))
933        } else {
934            None
935        };
936
937        let (input_coins, written_coins) = tx_coins.unwrap();
938        let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
939            HashMap::new();
940
941        // 1. Remove old coins from the DB by looking at the set of input coin objects
942        let coin_delete_keys = input_coins
943            .values()
944            .filter_map(|object| {
945                // only process address owned coins
946                let Owner::AddressOwner(owner) = object.owner() else {
947                    return None;
948                };
949
950                // only process coin types
951                let (coin_type, coin) = object
952                    .coin_type_maybe()
953                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
954
955                let key = CoinIndexKey2::new(
956                    *owner,
957                    coin_type.to_string(),
958                    coin.balance.value(),
959                    object.id(),
960                );
961
962                let map = balance_changes.entry(*owner).or_default();
963                let entry = map.entry(coin_type).or_insert(TotalBalance {
964                    num_coins: 0,
965                    balance: 0,
966                    address_balance: 0,
967                });
968                entry.num_coins -= 1;
969                entry.balance -= coin.balance.value() as i128;
970
971                Some(key)
972            })
973            .collect::<Vec<_>>();
974        trace!(
975            tx_digset=?digest,
976            "coin_delete_keys: {:?}",
977            coin_delete_keys,
978        );
979        batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
980
981        // 2. Insert new coins, or new versions of coins, by looking at `written_coins`.
982        let coin_add_keys = written_coins
983            .values()
984            .filter_map(|object| {
985                // only process address owned coins
986                let Owner::AddressOwner(owner) = object.owner() else {
987                    return None;
988                };
989
990                // only process coin types
991                let (coin_type, coin) = object
992                    .coin_type_maybe()
993                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
994
995                let key = CoinIndexKey2::new(
996                    *owner,
997                    coin_type.to_string(),
998                    coin.balance.value(),
999                    object.id(),
1000                );
1001                let value = CoinInfo {
1002                    version: object.version(),
1003                    digest: object.digest(),
1004                    balance: coin.balance.value(),
1005                    previous_transaction: object.previous_transaction,
1006                };
1007                let map = balance_changes.entry(*owner).or_default();
1008                let entry = map.entry(coin_type).or_insert(TotalBalance {
1009                    num_coins: 0,
1010                    balance: 0,
1011                    address_balance: 0,
1012                });
1013                entry.num_coins += 1;
1014                entry.balance += coin.balance.value() as i128;
1015
1016                Some((key, value))
1017            })
1018            .collect::<Vec<_>>();
1019        trace!(
1020            tx_digset=?digest,
1021            "coin_add_keys: {:?}",
1022            coin_add_keys,
1023        );
1024
1025        batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
1026
1027        let per_coin_type_balance_changes: Vec<_> = balance_changes
1028            .iter()
1029            .flat_map(|(address, balance_map)| {
1030                balance_map.iter().map(|(type_tag, balance)| {
1031                    (
1032                        (*address, type_tag.clone()),
1033                        Ok::<TotalBalance, SuiError>(*balance),
1034                    )
1035                })
1036            })
1037            .collect();
1038        let all_balance_changes: Vec<_> = balance_changes
1039            .into_iter()
1040            .map(|(address, balance_map)| {
1041                (
1042                    address,
1043                    Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
1044                )
1045            })
1046            .collect();
1047        let cache_updates = IndexStoreCacheUpdatesWithLocks {
1048            _locks,
1049            inner: IndexStoreCacheUpdates {
1050                per_coin_type_balance_changes,
1051                all_balance_changes,
1052            },
1053        };
1054        Ok(cache_updates)
1055    }
1056
1057    pub fn allocate_sequence_number(&self) -> u64 {
1058        self.next_sequence_number.fetch_add(1, Ordering::SeqCst)
1059    }
1060
1061    #[instrument(skip_all)]
1062    pub fn index_tx(
1063        &self,
1064        sequence: u64,
1065        sender: SuiAddress,
1066        active_inputs: impl Iterator<Item = ObjectID>,
1067        mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
1068        move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
1069        events: &TransactionEvents,
1070        object_index_changes: ObjectIndexChanges,
1071        digest: &TransactionDigest,
1072        timestamp_ms: u64,
1073        tx_coins: Option<TxCoins>,
1074        accumulator_events: Vec<AccumulatorEvent>,
1075        acquire_locks: bool,
1076    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
1077        let mut batch = StagedBatch::new();
1078
1079        batch.insert_batch(
1080            &self.tables.transaction_order,
1081            std::iter::once((sequence, *digest)),
1082        )?;
1083
1084        batch.insert_batch(
1085            &self.tables.transactions_seq,
1086            std::iter::once((*digest, sequence)),
1087        )?;
1088
1089        batch.insert_batch(
1090            &self.tables.transactions_from_addr,
1091            std::iter::once(((sender, sequence), *digest)),
1092        )?;
1093
1094        #[allow(deprecated)]
1095        if !self.remove_deprecated_tables {
1096            batch.insert_batch(
1097                &self.tables.transactions_by_input_object_id,
1098                active_inputs.map(|id| ((id, sequence), *digest)),
1099            )?;
1100
1101            batch.insert_batch(
1102                &self.tables.transactions_by_mutated_object_id,
1103                mutated_objects
1104                    .clone()
1105                    .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
1106            )?;
1107        }
1108
1109        batch.insert_batch(
1110            &self.tables.transactions_by_move_function,
1111            move_functions
1112                .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
1113        )?;
1114
1115        batch.insert_batch(
1116            &self.tables.transactions_to_addr,
1117            mutated_objects.filter_map(|(_, owner)| {
1118                owner
1119                    .get_address_owner_address()
1120                    .ok()
1121                    .map(|addr| ((addr, sequence), digest))
1122            }),
1123        )?;
1124
1125        // Coin Index
1126        let cache_updates = self.index_coin(
1127            digest,
1128            &mut batch,
1129            &object_index_changes,
1130            tx_coins,
1131            acquire_locks,
1132        )?;
1133
1134        // update address balances index
1135        let address_balance_updates = accumulator_events.into_iter().filter_map(|event| {
1136            let ty = &event.write.address.ty;
1137            let coin_type = sui_types::balance::Balance::maybe_get_balance_type_param(ty)?;
1138            Some(((event.write.address.address, coin_type), ()))
1139        });
1140        batch.insert_batch(&self.tables.address_balances, address_balance_updates)?;
1141
1142        // Owner index
1143        batch.delete_batch(
1144            &self.tables.owner_index,
1145            object_index_changes.deleted_owners.into_iter(),
1146        )?;
1147        batch.delete_batch(
1148            &self.tables.dynamic_field_index,
1149            object_index_changes.deleted_dynamic_fields.into_iter(),
1150        )?;
1151
1152        batch.insert_batch(
1153            &self.tables.owner_index,
1154            object_index_changes.new_owners.into_iter(),
1155        )?;
1156
1157        batch.insert_batch(
1158            &self.tables.dynamic_field_index,
1159            object_index_changes.new_dynamic_fields.into_iter(),
1160        )?;
1161
1162        // events
1163        let event_digest = events.digest();
1164        batch.insert_batch(
1165            &self.tables.event_order,
1166            events
1167                .data
1168                .iter()
1169                .enumerate()
1170                .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
1171        )?;
1172        batch.insert_batch(
1173            &self.tables.event_by_move_module,
1174            events
1175                .data
1176                .iter()
1177                .enumerate()
1178                .map(|(i, e)| {
1179                    (
1180                        i,
1181                        ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
1182                    )
1183                })
1184                .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
1185        )?;
1186        batch.insert_batch(
1187            &self.tables.event_by_sender,
1188            events.data.iter().enumerate().map(|(i, e)| {
1189                (
1190                    (e.sender, (sequence, i)),
1191                    (event_digest, *digest, timestamp_ms),
1192                )
1193            }),
1194        )?;
1195        batch.insert_batch(
1196            &self.tables.event_by_move_event,
1197            events.data.iter().enumerate().map(|(i, e)| {
1198                (
1199                    (e.type_.clone(), (sequence, i)),
1200                    (event_digest, *digest, timestamp_ms),
1201                )
1202            }),
1203        )?;
1204
1205        batch.insert_batch(
1206            &self.tables.event_by_time,
1207            events.data.iter().enumerate().map(|(i, _)| {
1208                (
1209                    (timestamp_ms, (sequence, i)),
1210                    (event_digest, *digest, timestamp_ms),
1211                )
1212            }),
1213        )?;
1214
1215        batch.insert_batch(
1216            &self.tables.event_by_event_module,
1217            events.data.iter().enumerate().map(|(i, e)| {
1218                (
1219                    (
1220                        ModuleId::new(e.type_.address, e.type_.module.clone()),
1221                        (sequence, i),
1222                    ),
1223                    (event_digest, *digest, timestamp_ms),
1224                )
1225            }),
1226        )?;
1227
1228        Ok((batch, cache_updates))
1229    }
1230
1231    /// Write a combined index batch and apply cache updates.
1232    pub fn commit_index_batch(
1233        &self,
1234        batch: DBBatch,
1235        cache_updates: Vec<IndexStoreCacheUpdates>,
1236    ) -> SuiResult {
1237        let invalidate_caches =
1238            read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
1239
1240        if invalidate_caches {
1241            for cu in &cache_updates {
1242                self.invalidate_per_coin_type_cache(
1243                    cu.per_coin_type_balance_changes.iter().map(|x| x.0.clone()),
1244                )?;
1245                self.invalidate_all_balance_cache(cu.all_balance_changes.iter().map(|x| x.0))?;
1246            }
1247        }
1248
1249        batch.write()?;
1250
1251        if !invalidate_caches {
1252            for cu in cache_updates {
1253                self.update_per_coin_type_cache(cu.per_coin_type_balance_changes)?;
1254                self.update_all_balance_cache(cu.all_balance_changes)?;
1255            }
1256        }
1257        Ok(())
1258    }
1259
1260    pub fn new_db_batch(&self) -> DBBatch {
1261        self.tables.transactions_from_addr.batch()
1262    }
1263
1264    pub fn next_sequence_number(&self) -> TxSequenceNumber {
1265        self.next_sequence_number.load(Ordering::SeqCst) + 1
1266    }
1267
1268    #[instrument(skip(self))]
1269    pub fn get_transactions(
1270        &self,
1271        filter: Option<TransactionFilter>,
1272        cursor: Option<TransactionDigest>,
1273        limit: Option<usize>,
1274        reverse: bool,
1275    ) -> SuiResult<Vec<TransactionDigest>> {
1276        // Lookup TransactionDigest sequence number,
1277        let cursor = if let Some(cursor) = cursor {
1278            Some(
1279                self.get_transaction_seq(&cursor)?
1280                    .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
1281            )
1282        } else {
1283            None
1284        };
1285        match filter {
1286            Some(TransactionFilter::MoveFunction {
1287                package,
1288                module,
1289                function,
1290            }) => Ok(self.get_transactions_by_move_function(
1291                package, module, function, cursor, limit, reverse,
1292            )?),
1293            Some(TransactionFilter::InputObject(object_id)) => {
1294                Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
1295            }
1296            Some(TransactionFilter::ChangedObject(object_id)) => {
1297                Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
1298            }
1299            Some(TransactionFilter::FromAddress(address)) => {
1300                Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
1301            }
1302            Some(TransactionFilter::ToAddress(address)) => {
1303                Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
1304            }
1305            // NOTE: filter via checkpoint sequence number is implemented in
1306            // `get_transactions` of authority.rs.
1307            Some(_) => Err(SuiErrorKind::UserInputError {
1308                error: UserInputError::Unsupported(format!("{:?}", filter)),
1309            }
1310            .into()),
1311            None => {
1312                if reverse {
1313                    let iter = self
1314                        .tables
1315                        .transaction_order
1316                        .reversed_safe_iter_with_bounds(
1317                            None,
1318                            Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
1319                        )?
1320                        .skip(usize::from(cursor.is_some()))
1321                        .map(|result| result.map(|(_, digest)| digest));
1322                    if let Some(limit) = limit {
1323                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1324                    } else {
1325                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
1326                    }
1327                } else {
1328                    let iter = self
1329                        .tables
1330                        .transaction_order
1331                        .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
1332                        .skip(usize::from(cursor.is_some()))
1333                        .map(|result| result.map(|(_, digest)| digest));
1334                    if let Some(limit) = limit {
1335                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1336                    } else {
1337                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
1338                    }
1339                }
1340            }
1341        }
1342    }
1343
1344    #[instrument(skip_all)]
1345    fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
1346        index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
1347        key: KeyT,
1348        cursor: Option<TxSequenceNumber>,
1349        limit: Option<usize>,
1350        reverse: bool,
1351    ) -> SuiResult<Vec<TransactionDigest>> {
1352        Ok(if reverse {
1353            let iter = index
1354                .reversed_safe_iter_with_bounds(
1355                    None,
1356                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
1357                )?
1358                // skip one more if exclusive cursor is Some
1359                .skip(usize::from(cursor.is_some()))
1360                .take_while(|result| {
1361                    result
1362                        .as_ref()
1363                        .map(|((id, _), _)| *id == key)
1364                        .unwrap_or(false)
1365                })
1366                .map(|result| result.map(|(_, digest)| digest));
1367            if let Some(limit) = limit {
1368                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1369            } else {
1370                iter.collect::<Result<Vec<_>, _>>()?
1371            }
1372        } else {
1373            let iter = index
1374                .safe_iter_with_bounds(
1375                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1376                    None,
1377                )
1378                // skip one more if exclusive cursor is Some
1379                .skip(usize::from(cursor.is_some()))
1380                .map(|result| result.expect("iterator db error"))
1381                .take_while(|((id, _), _)| *id == key)
1382                .map(|(_, digest)| digest);
1383            if let Some(limit) = limit {
1384                iter.take(limit).collect()
1385            } else {
1386                iter.collect()
1387            }
1388        })
1389    }
1390
1391    #[instrument(skip(self))]
1392    pub fn get_transactions_by_input_object(
1393        &self,
1394        input_object: ObjectID,
1395        cursor: Option<TxSequenceNumber>,
1396        limit: Option<usize>,
1397        reverse: bool,
1398    ) -> SuiResult<Vec<TransactionDigest>> {
1399        if self.remove_deprecated_tables {
1400            return Ok(vec![]);
1401        }
1402        #[allow(deprecated)]
1403        Self::get_transactions_from_index(
1404            &self.tables.transactions_by_input_object_id,
1405            input_object,
1406            cursor,
1407            limit,
1408            reverse,
1409        )
1410    }
1411
1412    #[instrument(skip(self))]
1413    pub fn get_transactions_by_mutated_object(
1414        &self,
1415        mutated_object: ObjectID,
1416        cursor: Option<TxSequenceNumber>,
1417        limit: Option<usize>,
1418        reverse: bool,
1419    ) -> SuiResult<Vec<TransactionDigest>> {
1420        if self.remove_deprecated_tables {
1421            return Ok(vec![]);
1422        }
1423        #[allow(deprecated)]
1424        Self::get_transactions_from_index(
1425            &self.tables.transactions_by_mutated_object_id,
1426            mutated_object,
1427            cursor,
1428            limit,
1429            reverse,
1430        )
1431    }
1432
1433    #[instrument(skip(self))]
1434    pub fn get_transactions_from_addr(
1435        &self,
1436        addr: SuiAddress,
1437        cursor: Option<TxSequenceNumber>,
1438        limit: Option<usize>,
1439        reverse: bool,
1440    ) -> SuiResult<Vec<TransactionDigest>> {
1441        Self::get_transactions_from_index(
1442            &self.tables.transactions_from_addr,
1443            addr,
1444            cursor,
1445            limit,
1446            reverse,
1447        )
1448    }
1449
1450    #[instrument(skip(self))]
1451    pub fn get_transactions_by_move_function(
1452        &self,
1453        package: ObjectID,
1454        module: Option<String>,
1455        function: Option<String>,
1456        cursor: Option<TxSequenceNumber>,
1457        limit: Option<usize>,
1458        reverse: bool,
1459    ) -> SuiResult<Vec<TransactionDigest>> {
1460        // If we are passed a function with no module return a UserInputError
1461        if function.is_some() && module.is_none() {
1462            return Err(SuiErrorKind::UserInputError {
1463                error: UserInputError::MoveFunctionInputError(
1464                    "Cannot supply function without supplying module".to_string(),
1465                ),
1466            }
1467            .into());
1468        }
1469
1470        // We cannot have a cursor without filling out the other keys.
1471        if cursor.is_some() && (module.is_none() || function.is_none()) {
1472            return Err(SuiErrorKind::UserInputError {
1473                error: UserInputError::MoveFunctionInputError(
1474                    "Cannot supply cursor without supplying module and function".to_string(),
1475                ),
1476            }
1477            .into());
1478        }
1479
1480        let cursor_val = cursor.unwrap_or(if reverse {
1481            TxSequenceNumber::MAX
1482        } else {
1483            TxSequenceNumber::MIN
1484        });
1485
1486        let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1487        let module_val = module.clone().unwrap_or(if reverse {
1488            max_string.clone()
1489        } else {
1490            "".to_string()
1491        });
1492
1493        let function_val =
1494            function
1495                .clone()
1496                .unwrap_or(if reverse { max_string } else { "".to_string() });
1497
1498        let key = (package, module_val, function_val, cursor_val);
1499        Ok(if reverse {
1500            let iter = self
1501                .tables
1502                .transactions_by_move_function
1503                .reversed_safe_iter_with_bounds(None, Some(key))?
1504                // skip one more if exclusive cursor is Some
1505                .skip(usize::from(cursor.is_some()))
1506                .take_while(|result| {
1507                    result
1508                        .as_ref()
1509                        .map(|((id, m, f, _), _)| {
1510                            *id == package
1511                                && module.as_ref().map(|x| x == m).unwrap_or(true)
1512                                && function.as_ref().map(|x| x == f).unwrap_or(true)
1513                        })
1514                        .unwrap_or(false)
1515                })
1516                .map(|result| result.map(|(_, digest)| digest));
1517            if let Some(limit) = limit {
1518                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1519            } else {
1520                iter.collect::<Result<Vec<_>, _>>()?
1521            }
1522        } else {
1523            let iter = self
1524                .tables
1525                .transactions_by_move_function
1526                .safe_iter_with_bounds(Some(key), None)
1527                .map(|result| result.expect("iterator db error"))
1528                // skip one more if exclusive cursor is Some
1529                .skip(usize::from(cursor.is_some()))
1530                .take_while(|((id, m, f, _), _)| {
1531                    *id == package
1532                        && module.as_ref().map(|x| x == m).unwrap_or(true)
1533                        && function.as_ref().map(|x| x == f).unwrap_or(true)
1534                })
1535                .map(|(_, digest)| digest);
1536            if let Some(limit) = limit {
1537                iter.take(limit).collect()
1538            } else {
1539                iter.collect()
1540            }
1541        })
1542    }
1543
1544    #[instrument(skip(self))]
1545    pub fn get_transactions_to_addr(
1546        &self,
1547        addr: SuiAddress,
1548        cursor: Option<TxSequenceNumber>,
1549        limit: Option<usize>,
1550        reverse: bool,
1551    ) -> SuiResult<Vec<TransactionDigest>> {
1552        Self::get_transactions_from_index(
1553            &self.tables.transactions_to_addr,
1554            addr,
1555            cursor,
1556            limit,
1557            reverse,
1558        )
1559    }
1560
1561    #[instrument(skip(self))]
1562    pub fn get_transaction_seq(
1563        &self,
1564        digest: &TransactionDigest,
1565    ) -> SuiResult<Option<TxSequenceNumber>> {
1566        Ok(self.tables.transactions_seq.get(digest)?)
1567    }
1568
1569    #[instrument(skip(self))]
1570    pub fn all_events(
1571        &self,
1572        tx_seq: TxSequenceNumber,
1573        event_seq: usize,
1574        limit: usize,
1575        descending: bool,
1576    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1577        Ok(if descending {
1578            self.tables
1579                .event_order
1580                .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1581                .take(limit)
1582                .map(|result| {
1583                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1584                        (digest, tx_digest, event_seq, time)
1585                    })
1586                })
1587                .collect::<Result<Vec<_>, _>>()?
1588        } else {
1589            self.tables
1590                .event_order
1591                .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1592                .take(limit)
1593                .map(|result| {
1594                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1595                        (digest, tx_digest, event_seq, time)
1596                    })
1597                })
1598                .collect::<Result<Vec<_>, _>>()?
1599        })
1600    }
1601
1602    #[instrument(skip(self))]
1603    pub fn events_by_transaction(
1604        &self,
1605        digest: &TransactionDigest,
1606        tx_seq: TxSequenceNumber,
1607        event_seq: usize,
1608        limit: usize,
1609        descending: bool,
1610    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1611        let seq = self
1612            .get_transaction_seq(digest)?
1613            .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1614        Ok(if descending {
1615            self.tables
1616                .event_order
1617                .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1618                .take_while(|result| {
1619                    result
1620                        .as_ref()
1621                        .map(|((tx, _), _)| tx == &seq)
1622                        .unwrap_or(false)
1623                })
1624                .take(limit)
1625                .map(|result| {
1626                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1627                        (digest, tx_digest, event_seq, time)
1628                    })
1629                })
1630                .collect::<Result<Vec<_>, _>>()?
1631        } else {
1632            self.tables
1633                .event_order
1634                .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1635                .map(|result| result.expect("iterator db error"))
1636                .take_while(|((tx, _), _)| tx == &seq)
1637                .take(limit)
1638                .map(|((_, event_seq), (digest, tx_digest, time))| {
1639                    (digest, tx_digest, event_seq, time)
1640                })
1641                .collect()
1642        })
1643    }
1644
1645    #[instrument(skip_all)]
1646    fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1647        index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1648        key: &KeyT,
1649        tx_seq: TxSequenceNumber,
1650        event_seq: usize,
1651        limit: usize,
1652        descending: bool,
1653    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1654        Ok(if descending {
1655            index
1656                .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1657                .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1658                .take(limit)
1659                .map(|result| {
1660                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1661                        (digest, tx_digest, event_seq, time)
1662                    })
1663                })
1664                .collect::<Result<Vec<_>, _>>()?
1665        } else {
1666            index
1667                .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1668                .map(|result| result.expect("iterator db error"))
1669                .take_while(|((m, _), _)| m == key)
1670                .take(limit)
1671                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1672                    (digest, tx_digest, event_seq, time)
1673                })
1674                .collect()
1675        })
1676    }
1677
1678    #[instrument(skip(self))]
1679    pub fn events_by_module_id(
1680        &self,
1681        module: &ModuleId,
1682        tx_seq: TxSequenceNumber,
1683        event_seq: usize,
1684        limit: usize,
1685        descending: bool,
1686    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1687        Self::get_event_from_index(
1688            &self.tables.event_by_move_module,
1689            module,
1690            tx_seq,
1691            event_seq,
1692            limit,
1693            descending,
1694        )
1695    }
1696
1697    #[instrument(skip(self))]
1698    pub fn events_by_move_event_struct_name(
1699        &self,
1700        struct_name: &StructTag,
1701        tx_seq: TxSequenceNumber,
1702        event_seq: usize,
1703        limit: usize,
1704        descending: bool,
1705    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1706        Self::get_event_from_index(
1707            &self.tables.event_by_move_event,
1708            struct_name,
1709            tx_seq,
1710            event_seq,
1711            limit,
1712            descending,
1713        )
1714    }
1715
1716    #[instrument(skip(self))]
1717    pub fn events_by_move_event_module(
1718        &self,
1719        module_id: &ModuleId,
1720        tx_seq: TxSequenceNumber,
1721        event_seq: usize,
1722        limit: usize,
1723        descending: bool,
1724    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1725        Self::get_event_from_index(
1726            &self.tables.event_by_event_module,
1727            module_id,
1728            tx_seq,
1729            event_seq,
1730            limit,
1731            descending,
1732        )
1733    }
1734
1735    #[instrument(skip(self))]
1736    pub fn events_by_sender(
1737        &self,
1738        sender: &SuiAddress,
1739        tx_seq: TxSequenceNumber,
1740        event_seq: usize,
1741        limit: usize,
1742        descending: bool,
1743    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1744        Self::get_event_from_index(
1745            &self.tables.event_by_sender,
1746            sender,
1747            tx_seq,
1748            event_seq,
1749            limit,
1750            descending,
1751        )
1752    }
1753
1754    #[instrument(skip(self))]
1755    pub fn event_iterator(
1756        &self,
1757        start_time: u64,
1758        end_time: u64,
1759        tx_seq: TxSequenceNumber,
1760        event_seq: usize,
1761        limit: usize,
1762        descending: bool,
1763    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1764        Ok(if descending {
1765            self.tables
1766                .event_by_time
1767                .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1768                .take_while(|result| {
1769                    result
1770                        .as_ref()
1771                        .map(|((m, _), _)| m >= &start_time)
1772                        .unwrap_or(false)
1773                })
1774                .take(limit)
1775                .map(|result| {
1776                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1777                        (digest, tx_digest, event_seq, time)
1778                    })
1779                })
1780                .collect::<Result<Vec<_>, _>>()?
1781        } else {
1782            self.tables
1783                .event_by_time
1784                .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1785                .map(|result| result.expect("iterator db error"))
1786                .take_while(|((m, _), _)| m <= &end_time)
1787                .take(limit)
1788                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1789                    (digest, tx_digest, event_seq, time)
1790                })
1791                .collect()
1792        })
1793    }
1794
1795    pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1796        match self
1797            .tables
1798            .event_by_time
1799            .reversed_safe_iter_with_bounds(
1800                None,
1801                Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1802            )?
1803            .next()
1804            .transpose()?
1805        {
1806            Some(((_, (watermark, _)), _)) => {
1807                if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1808                    info!(
1809                        "json rpc index pruning. Watermark is {} with digest {}",
1810                        watermark, digest
1811                    );
1812                }
1813                self.pruner_watermark.store(watermark, Ordering::Relaxed);
1814                self.tables.pruner_watermark.insert(&(), &watermark)?;
1815                Ok(watermark)
1816            }
1817            None => Ok(0),
1818        }
1819    }
1820
1821    pub fn get_dynamic_fields_iterator(
1822        &self,
1823        object: ObjectID,
1824        cursor: Option<ObjectID>,
1825    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1826    {
1827        Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1828    }
1829
1830    #[instrument(skip(self))]
1831    pub fn get_dynamic_field_object_id(
1832        &self,
1833        object: ObjectID,
1834        name_type: TypeTag,
1835        name_bcs_bytes: &[u8],
1836    ) -> SuiResult<Option<ObjectID>> {
1837        debug!(?object, "get_dynamic_field_object_id");
1838        let dynamic_field_id =
1839            dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1840                |e| {
1841                    SuiErrorKind::Unknown(format!(
1842                        "Unable to generate dynamic field id. Got error: {e:?}"
1843                    ))
1844                },
1845            )?;
1846
1847        if let Some(info) = self
1848            .tables
1849            .dynamic_field_index
1850            .get(&(object, dynamic_field_id))?
1851        {
1852            // info.object_id != dynamic_field_id ==> is_wrapper
1853            debug_assert!(
1854                info.object_id == dynamic_field_id
1855                    || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1856            );
1857            return Ok(Some(info.object_id));
1858        }
1859
1860        let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1861        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1862        let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1863            object,
1864            &dynamic_object_field_type,
1865            name_bcs_bytes,
1866        )
1867        .map_err(|e| {
1868            SuiErrorKind::Unknown(format!(
1869                "Unable to generate dynamic field id. Got error: {e:?}"
1870            ))
1871        })?;
1872        if let Some(info) = self
1873            .tables
1874            .dynamic_field_index
1875            .get(&(object, dynamic_object_field_id))?
1876        {
1877            return Ok(Some(info.object_id));
1878        }
1879
1880        Ok(None)
1881    }
1882
1883    #[instrument(skip(self))]
1884    pub fn get_owner_objects(
1885        &self,
1886        owner: SuiAddress,
1887        cursor: Option<ObjectID>,
1888        limit: usize,
1889        filter: Option<SuiObjectDataFilter>,
1890    ) -> SuiResult<Vec<ObjectInfo>> {
1891        let cursor = match cursor {
1892            Some(cursor) => cursor,
1893            None => ObjectID::ZERO,
1894        };
1895        Ok(self
1896            .get_owner_objects_iterator(owner, cursor, filter)?
1897            .take(limit)
1898            .collect())
1899    }
1900
1901    pub fn get_address_balance_coin_types_iter(
1902        &self,
1903        owner: SuiAddress,
1904    ) -> impl Iterator<Item = TypeTag> {
1905        let start_key = (owner, TypeTag::Bool);
1906        self.tables()
1907            .address_balances
1908            .safe_iter_with_bounds(Some(start_key), None)
1909            .map(|result| result.expect("iterator db error"))
1910            .take_while(move |(key, _)| key.0 == owner)
1911            .map(|(key, _)| key.1)
1912    }
1913
1914    pub fn get_owned_coins_iterator(
1915        coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1916        owner: SuiAddress,
1917        coin_type_tag: Option<String>,
1918    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1919        let all_coins = coin_type_tag.is_none();
1920        let starting_coin_type =
1921            coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1922        let start_key =
1923            CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1924        Ok(coin_index
1925            .safe_iter_with_bounds(Some(start_key), None)
1926            .map(|result| result.expect("iterator db error"))
1927            .take_while(move |(key, _)| {
1928                if key.owner != owner {
1929                    return false;
1930                }
1931                if !all_coins && starting_coin_type != key.coin_type {
1932                    return false;
1933                }
1934                true
1935            }))
1936    }
1937
1938    pub fn get_owned_coins_iterator_with_cursor(
1939        &self,
1940        owner: SuiAddress,
1941        cursor: (String, u64, ObjectID),
1942        limit: usize,
1943        one_coin_type_only: bool,
1944    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1945        let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1946        let start_key = CoinIndexKey2::new_from_cursor(
1947            owner,
1948            starting_coin_type.clone(),
1949            inverted_balance,
1950            starting_object_id,
1951        );
1952        Ok(self
1953            .tables
1954            .coin_index_2
1955            .safe_iter_with_bounds(Some(start_key), None)
1956            .map(|result| result.expect("iterator db error"))
1957            .filter(move |(key, _)| key.object_id != starting_object_id)
1958            .enumerate()
1959            .take_while(move |(index, (key, _))| {
1960                if *index >= limit {
1961                    return false;
1962                }
1963                if key.owner != owner {
1964                    return false;
1965                }
1966                if one_coin_type_only && starting_coin_type != key.coin_type {
1967                    return false;
1968                }
1969                true
1970            })
1971            .map(|(_index, (key, info))| (key, info)))
1972    }
1973
1974    /// starting_object_id can be used to implement pagination, where a client remembers the last
1975    /// object id of each page, and use it to query the next page.
1976    pub fn get_owner_objects_iterator(
1977        &self,
1978        owner: SuiAddress,
1979        starting_object_id: ObjectID,
1980        filter: Option<SuiObjectDataFilter>,
1981    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1982        Ok(self
1983            .tables
1984            .owner_index
1985            // The object id 0 is the smallest possible
1986            .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1987            .map(|result| result.expect("iterator db error"))
1988            .skip(usize::from(starting_object_id != ObjectID::ZERO))
1989            .take_while(move |((address_owner, _), _)| address_owner == &owner)
1990            .filter(move |(_, o)| {
1991                if let Some(filter) = filter.as_ref() {
1992                    filter.matches(o)
1993                } else {
1994                    true
1995                }
1996            })
1997            .map(|(_, object_info)| object_info))
1998    }
1999
2000    pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
2001        let mut batch = self.tables.owner_index.batch();
2002        batch.insert_batch(
2003            &self.tables.owner_index,
2004            object_index_changes.new_owners.into_iter(),
2005        )?;
2006        batch.insert_batch(
2007            &self.tables.dynamic_field_index,
2008            object_index_changes.new_dynamic_fields.into_iter(),
2009        )?;
2010        batch.write()?;
2011        Ok(())
2012    }
2013
2014    pub fn is_empty(&self) -> bool {
2015        self.tables.owner_index.is_empty()
2016    }
2017
2018    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
2019        // We are checkpointing the whole db
2020        self.tables
2021            .transactions_from_addr
2022            .checkpoint_db(path)
2023            .map_err(Into::into)
2024    }
2025
2026    /// This method first gets the balance from `per_coin_type_balance` cache. On a cache miss, it
2027    /// gets the balance for passed in `coin_type` from the `all_balance` cache. Only on the second
2028    /// cache miss, we go to the database (expensive) and update the cache. Notice that db read is
2029    /// done with `spawn_blocking` as that is expected to block
2030    #[instrument(skip(self))]
2031    pub fn get_coin_object_balance(
2032        &self,
2033        owner: SuiAddress,
2034        coin_type: TypeTag,
2035    ) -> SuiResult<TotalBalance> {
2036        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2037        let cloned_coin_type = coin_type.clone();
2038        let metrics_cloned = self.metrics.clone();
2039        let coin_index_cloned = self.tables.coin_index_2.clone();
2040        if force_disable_cache {
2041            return Self::get_balance_from_db(
2042                metrics_cloned,
2043                coin_index_cloned,
2044                owner,
2045                cloned_coin_type,
2046            )
2047            .map_err(|e| {
2048                SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2049                    .into()
2050            });
2051        }
2052
2053        self.metrics.balance_lookup_from_total.inc();
2054
2055        let balance = self
2056            .caches
2057            .per_coin_type_balance
2058            .get(&(owner, coin_type.clone()));
2059        if let Some(balance) = balance {
2060            return balance;
2061        }
2062        // cache miss, lookup in all balance cache
2063        let all_balance = self.caches.all_balances.get(&owner.clone());
2064        if let Some(Ok(all_balance)) = all_balance
2065            && let Some(balance) = all_balance.get(&coin_type)
2066        {
2067            return Ok(*balance);
2068        }
2069        let cloned_coin_type = coin_type.clone();
2070        let metrics_cloned = self.metrics.clone();
2071        let coin_index_cloned = self.tables.coin_index_2.clone();
2072        self.caches
2073            .per_coin_type_balance
2074            .get_with((owner, coin_type), move || {
2075                Self::get_balance_from_db(
2076                    metrics_cloned,
2077                    coin_index_cloned,
2078                    owner,
2079                    cloned_coin_type,
2080                )
2081                .map_err(|e| {
2082                    SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2083                        .into()
2084                })
2085            })
2086    }
2087
2088    /// This method gets the balance for all coin types from the `all_balance` cache. On a cache miss,
2089    /// we go to the database (expensive) and update the cache. This cache is dual purpose in the
2090    /// sense that it not only serves `get_AllBalance()` calls but is also used for serving
2091    /// `get_Balance()` queries. Notice that db read is performed with `spawn_blocking` as that is
2092    /// expected to block
2093    #[instrument(skip(self))]
2094    pub fn get_all_coin_object_balances(
2095        &self,
2096        owner: SuiAddress,
2097    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2098        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2099        let metrics_cloned = self.metrics.clone();
2100        let coin_index_cloned = self.tables.coin_index_2.clone();
2101        if force_disable_cache {
2102            return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
2103                .map_err(|e| {
2104                    SuiErrorKind::ExecutionError(format!(
2105                        "Failed to read all balance from DB: {:?}",
2106                        e
2107                    ))
2108                    .into()
2109                });
2110        }
2111
2112        self.metrics.all_balance_lookup_from_total.inc();
2113        let metrics_cloned = self.metrics.clone();
2114        let coin_index_cloned = self.tables.coin_index_2.clone();
2115        self.caches.all_balances.get_with(owner, move || {
2116            Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
2117                SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
2118                    .into()
2119            })
2120        })
2121    }
2122
2123    /// Read balance for a `SuiAddress` and `CoinType` from the backend database
2124    #[instrument(skip_all)]
2125    pub fn get_balance_from_db(
2126        metrics: Arc<IndexStoreMetrics>,
2127        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2128        owner: SuiAddress,
2129        coin_type: TypeTag,
2130    ) -> SuiResult<TotalBalance> {
2131        metrics.balance_lookup_from_db.inc();
2132        let coin_type_str = coin_type.to_string();
2133        let coins =
2134            Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
2135
2136        let mut balance = 0i128;
2137        let mut num_coins = 0;
2138        for (_key, coin_info) in coins {
2139            balance += coin_info.balance as i128;
2140            num_coins += 1;
2141        }
2142        Ok(TotalBalance {
2143            balance,
2144            num_coins,
2145            address_balance: 0,
2146        })
2147    }
2148
2149    /// Read all balances for a `SuiAddress` from the backend database
2150    #[instrument(skip_all)]
2151    pub fn get_all_balances_from_db(
2152        metrics: Arc<IndexStoreMetrics>,
2153        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2154        owner: SuiAddress,
2155    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2156        metrics.all_balance_lookup_from_db.inc();
2157        let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
2158        let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
2159            .chunk_by(|(key, _coin)| key.coin_type.clone());
2160        for (coin_type, coins) in &coins {
2161            let mut total_balance = 0i128;
2162            let mut coin_object_count = 0;
2163            for (_, coin_info) in coins {
2164                total_balance += coin_info.balance as i128;
2165                coin_object_count += 1;
2166            }
2167            let coin_type =
2168                TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
2169                    SuiErrorKind::ExecutionError(format!(
2170                        "Failed to parse event sender address: {:?}",
2171                        e
2172                    ))
2173                })?));
2174            balances.insert(
2175                coin_type,
2176                TotalBalance {
2177                    num_coins: coin_object_count,
2178                    balance: total_balance,
2179                    address_balance: 0,
2180                },
2181            );
2182        }
2183        Ok(Arc::new(balances))
2184    }
2185
2186    fn invalidate_per_coin_type_cache(
2187        &self,
2188        keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
2189    ) -> SuiResult {
2190        self.caches.per_coin_type_balance.batch_invalidate(keys);
2191        Ok(())
2192    }
2193
2194    fn invalidate_all_balance_cache(
2195        &self,
2196        addresses: impl IntoIterator<Item = SuiAddress>,
2197    ) -> SuiResult {
2198        self.caches.all_balances.batch_invalidate(addresses);
2199        Ok(())
2200    }
2201
2202    fn update_per_coin_type_cache(
2203        &self,
2204        keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
2205    ) -> SuiResult {
2206        self.caches
2207            .per_coin_type_balance
2208            .batch_merge(keys, Self::merge_balance);
2209        Ok(())
2210    }
2211
2212    fn merge_balance(
2213        old_balance: &SuiResult<TotalBalance>,
2214        balance_delta: &SuiResult<TotalBalance>,
2215    ) -> SuiResult<TotalBalance> {
2216        if let Ok(old_balance) = old_balance {
2217            if let Ok(balance_delta) = balance_delta {
2218                Ok(TotalBalance {
2219                    balance: old_balance.balance + balance_delta.balance,
2220                    num_coins: old_balance.num_coins + balance_delta.num_coins,
2221                    address_balance: old_balance.address_balance,
2222                })
2223            } else {
2224                balance_delta.clone()
2225            }
2226        } else {
2227            old_balance.clone()
2228        }
2229    }
2230
2231    fn update_all_balance_cache(
2232        &self,
2233        keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
2234    ) -> SuiResult {
2235        self.caches
2236            .all_balances
2237            .batch_merge(keys, Self::merge_all_balance);
2238        Ok(())
2239    }
2240
2241    fn merge_all_balance(
2242        old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2243        balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2244    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2245        if let Ok(old_balance) = old_balance {
2246            if let Ok(balance_delta) = balance_delta {
2247                let mut new_balance = HashMap::new();
2248                for (key, value) in old_balance.iter() {
2249                    new_balance.insert(key.clone(), *value);
2250                }
2251                for (key, delta) in balance_delta.iter() {
2252                    let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
2253                        balance: 0,
2254                        num_coins: 0,
2255                        address_balance: 0,
2256                    });
2257                    let new_total = TotalBalance {
2258                        balance: old.balance + delta.balance,
2259                        num_coins: old.num_coins + delta.num_coins,
2260                        address_balance: old.address_balance,
2261                    };
2262                    new_balance.insert(key.clone(), new_total);
2263                }
2264                Ok(Arc::new(new_balance))
2265            } else {
2266                balance_delta.clone()
2267            }
2268        } else {
2269            old_balance.clone()
2270        }
2271    }
2272}
2273
2274#[cfg(test)]
2275mod tests {
2276    use super::IndexStore;
2277    use super::ObjectIndexChanges;
2278    use move_core_types::account_address::AccountAddress;
2279    use prometheus::Registry;
2280    use std::collections::BTreeMap;
2281    use std::env::temp_dir;
2282    use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
2283    use sui_types::digests::TransactionDigest;
2284    use sui_types::effects::TransactionEvents;
2285    use sui_types::gas_coin::GAS;
2286    use sui_types::object;
2287    use sui_types::object::Owner;
2288
2289    #[tokio::test]
2290    async fn test_index_cache() -> anyhow::Result<()> {
2291        // This test is going to invoke `index_tx()`where 10 coins each with balance 100
2292        // are going to be added to an address. The balance is then going to be read from the db
2293        // and the cache. It should be 1000. Then, we are going to delete 3 of those coins from
2294        // the address and invoke `index_tx()` again and read balance. The balance should be 700
2295        // and verified from both db and cache.
2296        // This tests make sure we are invalidating entries in the cache and always reading latest
2297        // balance.
2298        let index_store =
2299            IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
2300        let address: SuiAddress = AccountAddress::random().into();
2301        let mut written_objects = BTreeMap::new();
2302        let mut input_objects = BTreeMap::new();
2303        let mut object_map = BTreeMap::new();
2304
2305        let mut new_objects = vec![];
2306        for _i in 0..10 {
2307            let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
2308            new_objects.push((
2309                (address, object.id()),
2310                ObjectInfo {
2311                    object_id: object.id(),
2312                    version: object.version(),
2313                    digest: object.digest(),
2314                    type_: ObjectType::Struct(object.type_().unwrap().clone()),
2315                    owner: Owner::AddressOwner(address),
2316                    previous_transaction: object.previous_transaction,
2317                },
2318            ));
2319            object_map.insert(object.id(), object.clone());
2320            written_objects.insert(object.data.id(), object);
2321        }
2322        let object_index_changes = ObjectIndexChanges {
2323            deleted_owners: vec![],
2324            deleted_dynamic_fields: vec![],
2325            new_owners: new_objects,
2326            new_dynamic_fields: vec![],
2327        };
2328
2329        let tx_coins = (input_objects.clone(), written_objects.clone());
2330        let seq = index_store.allocate_sequence_number();
2331        let (raw_batch, cache_updates) = index_store.index_tx(
2332            seq,
2333            address,
2334            vec![].into_iter(),
2335            vec![].into_iter(),
2336            vec![].into_iter(),
2337            &TransactionEvents { data: vec![] },
2338            object_index_changes,
2339            &TransactionDigest::random(),
2340            1234,
2341            Some(tx_coins),
2342            vec![],
2343            false,
2344        )?;
2345        // Commit the batch so subsequent reads see the data
2346        let mut db_batch = index_store.new_db_batch();
2347        db_batch.concat(vec![raw_batch]).unwrap();
2348        index_store
2349            .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2350            .unwrap();
2351
2352        let balance_from_db = IndexStore::get_balance_from_db(
2353            index_store.metrics.clone(),
2354            index_store.tables.coin_index_2.clone(),
2355            address,
2356            GAS::type_tag(),
2357        )?;
2358        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2359        assert_eq!(balance, balance_from_db);
2360        assert_eq!(balance.balance, 1000);
2361        assert_eq!(balance.num_coins, 10);
2362
2363        let all_balance = index_store.get_all_coin_object_balances(address)?;
2364        let balance = all_balance.get(&GAS::type_tag()).unwrap();
2365        assert_eq!(*balance, balance_from_db);
2366        assert_eq!(balance.balance, 1000);
2367        assert_eq!(balance.num_coins, 10);
2368
2369        written_objects.clear();
2370        let mut deleted_objects = vec![];
2371        for (id, object) in object_map.iter().take(3) {
2372            deleted_objects.push((address, *id));
2373            input_objects.insert(*id, object.to_owned());
2374        }
2375        let object_index_changes = ObjectIndexChanges {
2376            deleted_owners: deleted_objects.clone(),
2377            deleted_dynamic_fields: vec![],
2378            new_owners: vec![],
2379            new_dynamic_fields: vec![],
2380        };
2381        let tx_coins = (input_objects, written_objects);
2382        let seq = index_store.allocate_sequence_number();
2383        let (raw_batch, cache_updates) = index_store.index_tx(
2384            seq,
2385            address,
2386            vec![].into_iter(),
2387            vec![].into_iter(),
2388            vec![].into_iter(),
2389            &TransactionEvents { data: vec![] },
2390            object_index_changes,
2391            &TransactionDigest::random(),
2392            1234,
2393            Some(tx_coins),
2394            vec![],
2395            false,
2396        )?;
2397        let mut db_batch = index_store.new_db_batch();
2398        db_batch.concat(vec![raw_batch]).unwrap();
2399        index_store
2400            .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2401            .unwrap();
2402        let balance_from_db = IndexStore::get_balance_from_db(
2403            index_store.metrics.clone(),
2404            index_store.tables.coin_index_2.clone(),
2405            address,
2406            GAS::type_tag(),
2407        )?;
2408        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2409        assert_eq!(balance, balance_from_db);
2410        assert_eq!(balance.balance, 700);
2411        assert_eq!(balance.num_coins, 7);
2412        // Invalidate per coin type balance cache and read from all balance cache to ensure
2413        // the balance matches
2414        index_store
2415            .caches
2416            .per_coin_type_balance
2417            .invalidate(&(address, GAS::type_tag()));
2418        let all_balance = index_store.get_all_coin_object_balances(address)?;
2419        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2420        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2421        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2422        assert_eq!(balance, balance_from_db);
2423        assert_eq!(balance.balance, 700);
2424        assert_eq!(balance.num_coins, 7);
2425
2426        Ok(())
2427    }
2428
2429    #[tokio::test]
2430    async fn test_get_transaction_by_move_function() {
2431        use sui_types::base_types::ObjectID;
2432        use typed_store::Map;
2433
2434        let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2435        let db = &index_store.tables.transactions_by_move_function;
2436        db.insert(
2437            &(
2438                ObjectID::new([1; 32]),
2439                "mod".to_string(),
2440                "f".to_string(),
2441                0,
2442            ),
2443            &[0; 32].into(),
2444        )
2445        .unwrap();
2446        db.insert(
2447            &(
2448                ObjectID::new([1; 32]),
2449                "mod".to_string(),
2450                "Z".repeat(128),
2451                0,
2452            ),
2453            &[1; 32].into(),
2454        )
2455        .unwrap();
2456        db.insert(
2457            &(
2458                ObjectID::new([1; 32]),
2459                "mod".to_string(),
2460                "f".repeat(128),
2461                0,
2462            ),
2463            &[2; 32].into(),
2464        )
2465        .unwrap();
2466        db.insert(
2467            &(
2468                ObjectID::new([1; 32]),
2469                "mod".to_string(),
2470                "z".repeat(128),
2471                0,
2472            ),
2473            &[3; 32].into(),
2474        )
2475        .unwrap();
2476
2477        let mut v = index_store
2478            .get_transactions_by_move_function(
2479                ObjectID::new([1; 32]),
2480                Some("mod".to_string()),
2481                None,
2482                None,
2483                None,
2484                false,
2485            )
2486            .unwrap();
2487        let v_rev = index_store
2488            .get_transactions_by_move_function(
2489                ObjectID::new([1; 32]),
2490                Some("mod".to_string()),
2491                None,
2492                None,
2493                None,
2494                true,
2495            )
2496            .unwrap();
2497        v.reverse();
2498        assert_eq!(v, v_rev);
2499    }
2500}