sui_core/
jsonrpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! IndexStore supports creation of various ancillary indexes of state in SuiDataStore.
5//! The main user of this data is the explorer.
6
7use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use mysten_common::ZipDebugEqIteratorExt;
17use parking_lot::ArcMutexGuard;
18use prometheus::{
19    IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
20    register_int_counter_with_registry,
21};
22use serde::{Deserialize, Serialize, de::DeserializeOwned};
23use sui_types::accumulator_event::AccumulatorEvent;
24use typed_store::TypedStoreError;
25use typed_store::rocksdb::compaction_filter::Decision;
26
27use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
28use sui_storage::mutex_table::MutexTable;
29use sui_storage::sharded_lru::ShardedLruCache;
30use sui_types::base_types::{
31    ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
32};
33use sui_types::base_types::{ObjectInfo, ObjectRef};
34use sui_types::digests::TransactionEventsDigest;
35use sui_types::dynamic_field::{self, DynamicFieldInfo};
36use sui_types::effects::TransactionEvents;
37use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
38use sui_types::inner_temporary_store::TxCoins;
39use sui_types::object::{Object, Owner};
40use sui_types::parse_sui_struct_tag;
41use sui_types::storage::error::Error as StorageError;
42use tracing::{debug, info, instrument, trace};
43use typed_store::DBMapUtils;
44use typed_store::rocks::{
45    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, StagedBatch, default_db_options,
46    read_size_from_env,
47};
48use typed_store::traits::Map;
49
50type OwnerIndexKey = (SuiAddress, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
57pub struct CoinIndexKey2 {
58    pub owner: SuiAddress,
59    pub coin_type: String,
60    // the balance of the coin inverted `!coin.balance` in order to force sorting of coins to be
61    // from greatest to least
62    pub inverted_balance: u64,
63    pub object_id: ObjectID,
64}
65
66impl CoinIndexKey2 {
67    pub fn new_from_cursor(
68        owner: SuiAddress,
69        coin_type: String,
70        inverted_balance: u64,
71        object_id: ObjectID,
72    ) -> Self {
73        Self {
74            owner,
75            coin_type,
76            inverted_balance,
77            object_id,
78        }
79    }
80
81    pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
82        Self {
83            owner,
84            coin_type,
85            inverted_balance: !balance,
86            object_id,
87        }
88    }
89}
90
91const CURRENT_DB_VERSION: u64 = 0;
92const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
93
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
95struct MetadataInfo {
96    /// Version of the Database
97    version: u64,
98    /// Version of each of the column families
99    ///
100    /// This is used to version individual column families to determine if a CF needs to be
101    /// (re)initialized on startup.
102    column_families: BTreeMap<String, ColumnFamilyInfo>,
103}
104
105#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106struct ColumnFamilyInfo {
107    version: u64,
108}
109
110pub const MAX_TX_RANGE_SIZE: u64 = 4096;
111
112pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
113const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
114const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
115const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
116
117#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
118pub struct TotalBalance {
119    pub balance: i128,
120    pub num_coins: i64,
121    pub address_balance: u64,
122}
123
124#[derive(Debug)]
125pub struct ObjectIndexChanges {
126    pub deleted_owners: Vec<OwnerIndexKey>,
127    pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
128    pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
129    pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
130}
131
132#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
133pub struct CoinInfo {
134    pub version: SequenceNumber,
135    pub digest: ObjectDigest,
136    pub balance: u64,
137    pub previous_transaction: TransactionDigest,
138}
139
140impl CoinInfo {
141    pub fn from_object(object: &Object) -> Option<CoinInfo> {
142        object.as_coin_maybe().map(|coin| CoinInfo {
143            version: object.version(),
144            digest: object.digest(),
145            previous_transaction: object.previous_transaction,
146            balance: coin.value(),
147        })
148    }
149}
150
151pub struct IndexStoreMetrics {
152    balance_lookup_from_db: IntCounter,
153    balance_lookup_from_total: IntCounter,
154    all_balance_lookup_from_db: IntCounter,
155    all_balance_lookup_from_total: IntCounter,
156}
157
158impl IndexStoreMetrics {
159    pub fn new(registry: &Registry) -> IndexStoreMetrics {
160        Self {
161            balance_lookup_from_db: register_int_counter_with_registry!(
162                "balance_lookup_from_db",
163                "Total number of balance requests served from cache",
164                registry,
165            )
166            .unwrap(),
167            balance_lookup_from_total: register_int_counter_with_registry!(
168                "balance_lookup_from_total",
169                "Total number of balance requests served ",
170                registry,
171            )
172            .unwrap(),
173            all_balance_lookup_from_db: register_int_counter_with_registry!(
174                "all_balance_lookup_from_db",
175                "Total number of all balance requests served from cache",
176                registry,
177            )
178            .unwrap(),
179            all_balance_lookup_from_total: register_int_counter_with_registry!(
180                "all_balance_lookup_from_total",
181                "Total number of all balance requests served",
182                registry,
183            )
184            .unwrap(),
185        }
186    }
187}
188
189pub struct IndexStoreCaches {
190    per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
191    all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
192    pub locks: MutexTable<SuiAddress>,
193}
194
195type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
196
197/// Cache updates with optional locks held. Returned from `index_tx`.
198/// In sync mode, locks are acquired and held until the batch is committed.
199/// In async mode, locks are None and this is converted to `IndexStoreCacheUpdates`
200/// via `into_inner()` before sending across threads.
201pub struct IndexStoreCacheUpdatesWithLocks {
202    pub(crate) _locks: Option<Vec<OwnedMutexGuard<()>>>,
203    pub(crate) inner: IndexStoreCacheUpdates,
204}
205
206impl IndexStoreCacheUpdatesWithLocks {
207    pub fn into_inner(self) -> IndexStoreCacheUpdates {
208        self.inner
209    }
210}
211
212/// Send-safe cache updates without locks, used in the async post-processing channel
213/// and by `commit_index_batch`.
214#[derive(Default)]
215pub struct IndexStoreCacheUpdates {
216    per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
217    all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
218}
219
220#[derive(DBMapUtils)]
221pub struct IndexStoreTables {
222    /// A singleton that store metadata information on the DB.
223    ///
224    /// A few uses for this singleton:
225    /// - determining if the DB has been initialized (as some tables could still be empty post
226    ///   initialization)
227    /// - version of each column family and their respective initialization status
228    meta: DBMap<(), MetadataInfo>,
229
230    /// Index from sui address to transactions initiated by that address.
231    transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
232
233    /// Index from sui address to transactions that were sent to that address.
234    transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
235
236    /// Index from object id to transactions that used that object id as input.
237    #[deprecated]
238    transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
239
240    /// Index from object id to transactions that modified/created that object id.
241    #[deprecated]
242    transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
243
244    /// Index from package id, module and function identifier to transactions that used that moce function call as input.
245    transactions_by_move_function:
246        DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
247
248    /// Ordering of all indexed transactions.
249    transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
250
251    /// Index from transaction digest to sequence number.
252    transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
253
254    /// This is an index of object references to currently existing objects, indexed by the
255    /// composite key of the SuiAddress of their owner and the object ID of the object.
256    /// This composite index allows an efficient iterator to list all objected currently owned
257    /// by a specific user, and their object reference.
258    owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
259
260    coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
261    // Simple index that just tracks the existance of an address balance for an address.
262    address_balances: DBMap<(SuiAddress, TypeTag), ()>,
263
264    /// This is an index of object references to currently existing dynamic field object, indexed by the
265    /// composite key of the object ID of their parent and the object ID of the dynamic field object.
266    /// This composite index allows an efficient iterator to list all objects currently owned
267    /// by a specific object, and their object reference.
268    dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
269
270    event_order: DBMap<EventId, EventIndex>,
271    event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
272    event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
273    event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
274    event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
275    event_by_time: DBMap<(u64, EventId), EventIndex>,
276
277    pruner_watermark: DBMap<(), TxSequenceNumber>,
278}
279
280impl IndexStoreTables {
281    pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
282        &self.owner_index
283    }
284
285    pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
286        &self.coin_index_2
287    }
288
289    pub fn check_databases_equal(&self, other: &IndexStoreTables) {
290        fn assert_tables_equal<K, V>(name: &str, table_a: &DBMap<K, V>, table_b: &DBMap<K, V>)
291        where
292            K: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
293            V: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
294        {
295            let mut iter_a = table_a.safe_iter();
296            let mut iter_b = table_b.safe_iter();
297            let mut count = 0u64;
298            loop {
299                match (iter_a.next(), iter_b.next()) {
300                    (Some(a), Some(b)) => {
301                        let (ka, va): (K, V) = a.expect("failed to read from table_a");
302                        let (kb, vb): (K, V) = b.expect("failed to read from table_b");
303                        assert!(
304                            ka == kb && va == vb,
305                            "{name}: mismatch at entry {count}:\n  a=({ka:?}, {va:?})\n  b=({kb:?}, {vb:?})"
306                        );
307                        count += 1;
308                    }
309                    (None, None) => break,
310                    (Some(_), None) => {
311                        panic!(
312                            "{name}: table_a has more entries than table_b (diverged after {count} entries)"
313                        );
314                    }
315                    (None, Some(_)) => {
316                        panic!(
317                            "{name}: table_b has more entries than table_a (diverged after {count} entries)"
318                        );
319                    }
320                }
321            }
322            info!("{name}: verified {count} entries are identical");
323        }
324
325        // Tables keyed by TxSequenceNumber may have different sequence numbers between
326        // async and sync post-processing. Compare the (prefix, value) pairs as sorted
327        // vectors instead of lockstep iteration.
328        // Crash recovery can re-index transactions with new sequence numbers while
329        // old entries remain, producing duplicates. Deduplicate before comparing.
330        fn assert_seq_table_equal<P, V>(
331            name: &str,
332            table_a: &DBMap<(P, TxSequenceNumber), V>,
333            table_b: &DBMap<(P, TxSequenceNumber), V>,
334        ) where
335            P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
336            V: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
337        {
338            let mut entries_a: Vec<(P, V)> = table_a
339                .safe_iter()
340                .map(|r| {
341                    let ((p, _seq), v) = r.expect("failed to read from table_a");
342                    (p, v)
343                })
344                .collect();
345            let mut entries_b: Vec<(P, V)> = table_b
346                .safe_iter()
347                .map(|r| {
348                    let ((p, _seq), v) = r.expect("failed to read from table_b");
349                    (p, v)
350                })
351                .collect();
352            entries_a.sort();
353            entries_a.dedup();
354            entries_b.sort();
355            entries_b.dedup();
356            assert!(
357                entries_a.len() == entries_b.len(),
358                "{name}: different number of unique entries: {} vs {}",
359                entries_a.len(),
360                entries_b.len()
361            );
362            for (i, (a, b)) in entries_a.iter().zip_debug_eq(entries_b.iter()).enumerate() {
363                assert!(
364                    a == b,
365                    "{name}: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
366                );
367            }
368            info!(
369                "{name}: verified {} unique entries match (ignoring sequence numbers)",
370                entries_a.len()
371            );
372        }
373
374        // EventIndex contains a wall-clock timestamp that differs between nodes.
375        // Compare only (TransactionEventsDigest, TransactionDigest).
376        type EventKey = (TransactionEventsDigest, TransactionDigest);
377        fn strip_timestamp(ei: EventIndex) -> EventKey {
378            (ei.0, ei.1)
379        }
380
381        // Tables where the key contains an EventId = (TxSequenceNumber, usize). Compare
382        // the (prefix, event_key) pairs as sorted vectors, ignoring both event ids and
383        // timestamps.
384        fn assert_event_table_equal<P>(
385            name: &str,
386            table_a: &DBMap<(P, EventId), EventIndex>,
387            table_b: &DBMap<(P, EventId), EventIndex>,
388        ) where
389            P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
390        {
391            let mut entries_a: Vec<(P, EventKey)> = table_a
392                .safe_iter()
393                .map(|r| {
394                    let ((p, _eid), v) = r.expect("failed to read from table_a");
395                    (p, strip_timestamp(v))
396                })
397                .collect();
398            let mut entries_b: Vec<(P, EventKey)> = table_b
399                .safe_iter()
400                .map(|r| {
401                    let ((p, _eid), v) = r.expect("failed to read from table_b");
402                    (p, strip_timestamp(v))
403                })
404                .collect();
405            entries_a.sort();
406            entries_a.dedup();
407            entries_b.sort();
408            entries_b.dedup();
409            assert!(
410                entries_a.len() == entries_b.len(),
411                "{name}: different number of unique entries: {} vs {}",
412                entries_a.len(),
413                entries_b.len()
414            );
415            for (i, (a, b)) in entries_a.iter().zip_debug_eq(entries_b.iter()).enumerate() {
416                assert!(
417                    a == b,
418                    "{name}: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
419                );
420            }
421            info!(
422                "{name}: verified {} unique entries match (ignoring event ids and timestamps)",
423                entries_a.len()
424            );
425        }
426
427        // Exact comparison — tables with no sequence numbers in keys.
428        assert_tables_equal("owner_index", &self.owner_index, &other.owner_index);
429        assert_tables_equal("coin_index_2", &self.coin_index_2, &other.coin_index_2);
430        assert_tables_equal(
431            "address_balances",
432            &self.address_balances,
433            &other.address_balances,
434        );
435        assert_tables_equal(
436            "dynamic_field_index",
437            &self.dynamic_field_index,
438            &other.dynamic_field_index,
439        );
440
441        // Transaction tables — compare (address, digest) pairs ignoring sequence numbers.
442        assert_seq_table_equal(
443            "transactions_from_addr",
444            &self.transactions_from_addr,
445            &other.transactions_from_addr,
446        );
447        assert_seq_table_equal(
448            "transactions_to_addr",
449            &self.transactions_to_addr,
450            &other.transactions_to_addr,
451        );
452        // transactions_by_move_function has a 4-tuple key: (ObjectID, String, String, TxSequenceNumber)
453        {
454            let mut entries_a: Vec<((ObjectID, String, String), TransactionDigest)> = self
455                .transactions_by_move_function
456                .safe_iter()
457                .map(|r| {
458                    let ((pkg, module, func, _seq), digest) =
459                        r.expect("failed to read from table_a");
460                    ((pkg, module, func), digest)
461                })
462                .collect();
463            let mut entries_b: Vec<((ObjectID, String, String), TransactionDigest)> = other
464                .transactions_by_move_function
465                .safe_iter()
466                .map(|r| {
467                    let ((pkg, module, func, _seq), digest) =
468                        r.expect("failed to read from table_b");
469                    ((pkg, module, func), digest)
470                })
471                .collect();
472            entries_a.sort();
473            entries_a.dedup();
474            entries_b.sort();
475            entries_b.dedup();
476            assert!(
477                entries_a.len() == entries_b.len(),
478                "transactions_by_move_function: different number of unique entries: {} vs {}",
479                entries_a.len(),
480                entries_b.len()
481            );
482            for (i, (a, b)) in entries_a.iter().zip_debug_eq(entries_b.iter()).enumerate() {
483                assert!(
484                    a == b,
485                    "transactions_by_move_function: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
486                );
487            }
488            info!(
489                "transactions_by_move_function: verified {} entries match (ignoring sequence numbers)",
490                entries_a.len()
491            );
492        }
493
494        // Event tables — compare event keys ignoring event ids and timestamps.
495        {
496            // event_order: DBMap<EventId, EventIndex> — no prefix, just compare values.
497            let mut vals_a: Vec<EventKey> = self
498                .event_order
499                .safe_iter()
500                .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
501                .collect();
502            let mut vals_b: Vec<EventKey> = other
503                .event_order
504                .safe_iter()
505                .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
506                .collect();
507            vals_a.sort();
508            vals_a.dedup();
509            vals_b.sort();
510            vals_b.dedup();
511            assert!(
512                vals_a.len() == vals_b.len(),
513                "event_order: different number of entries: {} vs {}",
514                vals_a.len(),
515                vals_b.len()
516            );
517            for (i, (a, b)) in vals_a.iter().zip_debug_eq(vals_b.iter()).enumerate() {
518                assert!(
519                    a == b,
520                    "event_order: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
521                );
522            }
523            info!(
524                "event_order: verified {} entries match (ignoring event ids and timestamps)",
525                vals_a.len()
526            );
527        }
528        assert_event_table_equal(
529            "event_by_move_module",
530            &self.event_by_move_module,
531            &other.event_by_move_module,
532        );
533        assert_event_table_equal(
534            "event_by_move_event",
535            &self.event_by_move_event,
536            &other.event_by_move_event,
537        );
538        assert_event_table_equal(
539            "event_by_event_module",
540            &self.event_by_event_module,
541            &other.event_by_event_module,
542        );
543        assert_event_table_equal(
544            "event_by_sender",
545            &self.event_by_sender,
546            &other.event_by_sender,
547        );
548        // event_by_time: key is (timestamp_ms, EventId) — timestamps differ between
549        // nodes, so just compare the set of EventKey values.
550        {
551            let mut vals_a: Vec<EventKey> = self
552                .event_by_time
553                .safe_iter()
554                .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
555                .collect();
556            let mut vals_b: Vec<EventKey> = other
557                .event_by_time
558                .safe_iter()
559                .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
560                .collect();
561            vals_a.sort();
562            vals_a.dedup();
563            vals_b.sort();
564            vals_b.dedup();
565            assert!(
566                vals_a.len() == vals_b.len(),
567                "event_by_time: different number of entries: {} vs {}",
568                vals_a.len(),
569                vals_b.len()
570            );
571            for (i, (a, b)) in vals_a.iter().zip_debug_eq(vals_b.iter()).enumerate() {
572                assert!(
573                    a == b,
574                    "event_by_time: mismatch at sorted entry {i}:\n  a={a:?}\n  b={b:?}"
575                );
576            }
577            info!(
578                "event_by_time: verified {} entries match (ignoring timestamps and event ids)",
579                vals_a.len()
580            );
581        }
582
583        // Skipped tables:
584        // - transaction_order / transactions_seq: sequence numbers differ by design
585        // - transactions_by_input_object_id / transactions_by_mutated_object_id: deprecated
586        // - meta: metadata singleton, not meaningful to compare
587        // - pruner_watermark: operational state
588    }
589
590    #[allow(deprecated)]
591    fn init(&mut self) -> Result<(), StorageError> {
592        let metadata = {
593            match self.meta.get(&()) {
594                Ok(Some(metadata)) => metadata,
595                Ok(None) | Err(_) => MetadataInfo {
596                    version: CURRENT_DB_VERSION,
597                    column_families: BTreeMap::new(),
598                },
599            }
600        };
601
602        // Commit to the DB that the indexes have been initialized
603        self.meta.insert(&(), &metadata)?;
604
605        Ok(())
606    }
607
608    pub fn get_dynamic_fields_iterator(
609        &self,
610        object: ObjectID,
611        cursor: Option<ObjectID>,
612    ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
613        debug!(?object, "get_dynamic_fields");
614        // The object id 0 is the smallest possible
615        let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
616        let iter_upper_bound = (object, ObjectID::MAX);
617        self.dynamic_field_index
618            .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
619            // skip an extra b/c the cursor is exclusive
620            .skip(usize::from(cursor.is_some()))
621            .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
622            .map_ok(|((_, c), object_info)| (c, object_info))
623    }
624}
625
626pub struct IndexStore {
627    next_sequence_number: AtomicU64,
628    tables: IndexStoreTables,
629    pub caches: IndexStoreCaches,
630    metrics: Arc<IndexStoreMetrics>,
631    max_type_length: u64,
632    remove_deprecated_tables: bool,
633    pruner_watermark: Arc<AtomicU64>,
634}
635
636struct JsonRpcCompactionMetrics {
637    key_removed: IntCounterVec,
638    key_kept: IntCounterVec,
639    key_error: IntCounterVec,
640}
641
642impl JsonRpcCompactionMetrics {
643    pub fn new(registry: &Registry) -> Arc<Self> {
644        Arc::new(Self {
645            key_removed: register_int_counter_vec_with_registry!(
646                "json_rpc_compaction_filter_key_removed",
647                "Compaction key removed",
648                &["cf"],
649                registry
650            )
651            .unwrap(),
652            key_kept: register_int_counter_vec_with_registry!(
653                "json_rpc_compaction_filter_key_kept",
654                "Compaction key kept",
655                &["cf"],
656                registry
657            )
658            .unwrap(),
659            key_error: register_int_counter_vec_with_registry!(
660                "json_rpc_compaction_filter_key_error",
661                "Compaction error",
662                &["cf"],
663                registry
664            )
665            .unwrap(),
666        })
667    }
668}
669
670fn compaction_filter_config<T: DeserializeOwned>(
671    name: &str,
672    metrics: Arc<JsonRpcCompactionMetrics>,
673    mut db_options: DBOptions,
674    pruner_watermark: Arc<AtomicU64>,
675    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
676    by_key: bool,
677) -> DBOptions {
678    let cf = name.to_string();
679    db_options
680        .options
681        .set_compaction_filter(name, move |_, key, value| {
682            let bytes = if by_key { key } else { value };
683            let deserializer = bincode::DefaultOptions::new()
684                .with_big_endian()
685                .with_fixint_encoding();
686            match deserializer.deserialize(bytes) {
687                Ok(key_data) => {
688                    let sequence_number = extractor(key_data);
689                    if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
690                        metrics.key_removed.with_label_values(&[&cf]).inc();
691                        Decision::Remove
692                    } else {
693                        metrics.key_kept.with_label_values(&[&cf]).inc();
694                        Decision::Keep
695                    }
696                }
697                Err(_) => {
698                    metrics.key_error.with_label_values(&[&cf]).inc();
699                    Decision::Keep
700                }
701            }
702        });
703    db_options
704}
705
706fn compaction_filter_config_by_key<T: DeserializeOwned>(
707    name: &str,
708    metrics: Arc<JsonRpcCompactionMetrics>,
709    db_options: DBOptions,
710    pruner_watermark: Arc<AtomicU64>,
711    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
712) -> DBOptions {
713    compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
714}
715
716fn coin_index_table_default_config() -> DBOptions {
717    default_db_options()
718        .optimize_for_write_throughput()
719        .optimize_for_read(
720            read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
721        )
722        .disable_write_throttling()
723}
724
725impl IndexStore {
726    pub fn new_without_init(
727        path: PathBuf,
728        registry: &Registry,
729        max_type_length: Option<u64>,
730        remove_deprecated_tables: bool,
731    ) -> Self {
732        let db_options = default_db_options().disable_write_throttling();
733        let pruner_watermark = Arc::new(AtomicU64::new(0));
734        let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
735        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
736            (
737                "transactions_from_addr".to_string(),
738                compaction_filter_config_by_key(
739                    "transactions_from_addr",
740                    compaction_metrics.clone(),
741                    db_options.clone(),
742                    pruner_watermark.clone(),
743                    |(_, id): (SuiAddress, TxSequenceNumber)| id,
744                ),
745            ),
746            (
747                "transactions_to_addr".to_string(),
748                compaction_filter_config_by_key(
749                    "transactions_to_addr",
750                    compaction_metrics.clone(),
751                    db_options.clone(),
752                    pruner_watermark.clone(),
753                    |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
754                ),
755            ),
756            (
757                "transactions_by_move_function".to_string(),
758                compaction_filter_config_by_key(
759                    "transactions_by_move_function",
760                    compaction_metrics.clone(),
761                    db_options.clone(),
762                    pruner_watermark.clone(),
763                    |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
764                ),
765            ),
766            (
767                "transaction_order".to_string(),
768                compaction_filter_config_by_key(
769                    "transaction_order",
770                    compaction_metrics.clone(),
771                    db_options.clone(),
772                    pruner_watermark.clone(),
773                    |sequence_number: TxSequenceNumber| sequence_number,
774                ),
775            ),
776            (
777                "transactions_seq".to_string(),
778                compaction_filter_config(
779                    "transactions_seq",
780                    compaction_metrics.clone(),
781                    db_options.clone(),
782                    pruner_watermark.clone(),
783                    |sequence_number: TxSequenceNumber| sequence_number,
784                    false,
785                ),
786            ),
787            (
788                "coin_index_2".to_string(),
789                coin_index_table_default_config(),
790            ),
791            (
792                "event_order".to_string(),
793                compaction_filter_config_by_key(
794                    "event_order",
795                    compaction_metrics.clone(),
796                    db_options.clone(),
797                    pruner_watermark.clone(),
798                    |event_id: EventId| event_id.0,
799                ),
800            ),
801            (
802                "event_by_move_module".to_string(),
803                compaction_filter_config_by_key(
804                    "event_by_move_module",
805                    compaction_metrics.clone(),
806                    db_options.clone(),
807                    pruner_watermark.clone(),
808                    |(_, event_id): (ModuleId, EventId)| event_id.0,
809                ),
810            ),
811            (
812                "event_by_event_module".to_string(),
813                compaction_filter_config_by_key(
814                    "event_by_event_module",
815                    compaction_metrics.clone(),
816                    db_options.clone(),
817                    pruner_watermark.clone(),
818                    |(_, event_id): (ModuleId, EventId)| event_id.0,
819                ),
820            ),
821            (
822                "event_by_sender".to_string(),
823                compaction_filter_config_by_key(
824                    "event_by_sender",
825                    compaction_metrics.clone(),
826                    db_options.clone(),
827                    pruner_watermark.clone(),
828                    |(_, event_id): (SuiAddress, EventId)| event_id.0,
829                ),
830            ),
831            (
832                "event_by_time".to_string(),
833                compaction_filter_config_by_key(
834                    "event_by_time",
835                    compaction_metrics.clone(),
836                    db_options.clone(),
837                    pruner_watermark.clone(),
838                    |(_, event_id): (u64, EventId)| event_id.0,
839                ),
840            ),
841        ]));
842        let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
843            path,
844            MetricConf::new("index"),
845            Some(db_options.options),
846            Some(table_options),
847            remove_deprecated_tables,
848        );
849
850        let metrics = IndexStoreMetrics::new(registry);
851        let caches = IndexStoreCaches {
852            per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
853            all_balances: ShardedLruCache::new(1_000_000, 1000),
854            locks: MutexTable::new(128),
855        };
856        let next_sequence_number = tables
857            .transaction_order
858            .reversed_safe_iter_with_bounds(None, None)
859            .expect("failed to initialize indexes")
860            .next()
861            .transpose()
862            .expect("failed to initialize indexes")
863            .map(|(seq, _)| seq + 1)
864            .unwrap_or(0)
865            .into();
866        let pruner_watermark_value = tables
867            .pruner_watermark
868            .get(&())
869            .expect("failed to initialize index tables")
870            .unwrap_or(0);
871        pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
872
873        Self {
874            tables,
875            next_sequence_number,
876            caches,
877            metrics: Arc::new(metrics),
878            max_type_length: max_type_length.unwrap_or(128),
879            remove_deprecated_tables,
880            pruner_watermark,
881        }
882    }
883
884    pub fn new(
885        path: PathBuf,
886        registry: &Registry,
887        max_type_length: Option<u64>,
888        remove_deprecated_tables: bool,
889    ) -> Self {
890        let mut store =
891            Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
892        store.tables.init().unwrap();
893        store
894    }
895
896    pub fn tables(&self) -> &IndexStoreTables {
897        &self.tables
898    }
899
900    #[instrument(skip_all)]
901    pub fn index_coin(
902        &self,
903        digest: &TransactionDigest,
904        batch: &mut StagedBatch,
905        object_index_changes: &ObjectIndexChanges,
906        tx_coins: Option<TxCoins>,
907        acquire_locks: bool,
908    ) -> SuiResult<IndexStoreCacheUpdatesWithLocks> {
909        // In production if this code path is hit, we should expect `tx_coins` to not be None.
910        // However, in many tests today we do not distinguish validator and/or fullnode, so
911        // we gracefully exist here.
912        if tx_coins.is_none() {
913            return Ok(IndexStoreCacheUpdatesWithLocks {
914                _locks: None,
915                inner: IndexStoreCacheUpdates::default(),
916            });
917        }
918
919        let _locks = if acquire_locks {
920            let mut addresses: HashSet<SuiAddress> = HashSet::new();
921            addresses.extend(
922                object_index_changes
923                    .deleted_owners
924                    .iter()
925                    .map(|(owner, _)| *owner),
926            );
927            addresses.extend(
928                object_index_changes
929                    .new_owners
930                    .iter()
931                    .map(|((owner, _), _)| *owner),
932            );
933            Some(self.caches.locks.acquire_locks(addresses.into_iter()))
934        } else {
935            None
936        };
937
938        let (input_coins, written_coins) = tx_coins.unwrap();
939        let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
940            HashMap::new();
941
942        // 1. Remove old coins from the DB by looking at the set of input coin objects
943        let coin_delete_keys = input_coins
944            .values()
945            .filter_map(|object| {
946                // only process address owned coins
947                let Owner::AddressOwner(owner) = object.owner() else {
948                    return None;
949                };
950
951                // only process coin types
952                let (coin_type, coin) = object
953                    .coin_type_maybe()
954                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
955
956                let key = CoinIndexKey2::new(
957                    *owner,
958                    coin_type.to_string(),
959                    coin.balance.value(),
960                    object.id(),
961                );
962
963                let map = balance_changes.entry(*owner).or_default();
964                let entry = map.entry(coin_type).or_insert(TotalBalance {
965                    num_coins: 0,
966                    balance: 0,
967                    address_balance: 0,
968                });
969                entry.num_coins -= 1;
970                entry.balance -= coin.balance.value() as i128;
971
972                Some(key)
973            })
974            .collect::<Vec<_>>();
975        trace!(
976            tx_digset=?digest,
977            "coin_delete_keys: {:?}",
978            coin_delete_keys,
979        );
980        batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
981
982        // 2. Insert new coins, or new versions of coins, by looking at `written_coins`.
983        let coin_add_keys = written_coins
984            .values()
985            .filter_map(|object| {
986                // only process address owned coins
987                let Owner::AddressOwner(owner) = object.owner() else {
988                    return None;
989                };
990
991                // only process coin types
992                let (coin_type, coin) = object
993                    .coin_type_maybe()
994                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
995
996                let key = CoinIndexKey2::new(
997                    *owner,
998                    coin_type.to_string(),
999                    coin.balance.value(),
1000                    object.id(),
1001                );
1002                let value = CoinInfo {
1003                    version: object.version(),
1004                    digest: object.digest(),
1005                    balance: coin.balance.value(),
1006                    previous_transaction: object.previous_transaction,
1007                };
1008                let map = balance_changes.entry(*owner).or_default();
1009                let entry = map.entry(coin_type).or_insert(TotalBalance {
1010                    num_coins: 0,
1011                    balance: 0,
1012                    address_balance: 0,
1013                });
1014                entry.num_coins += 1;
1015                entry.balance += coin.balance.value() as i128;
1016
1017                Some((key, value))
1018            })
1019            .collect::<Vec<_>>();
1020        trace!(
1021            tx_digset=?digest,
1022            "coin_add_keys: {:?}",
1023            coin_add_keys,
1024        );
1025
1026        batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
1027
1028        let per_coin_type_balance_changes: Vec<_> = balance_changes
1029            .iter()
1030            .flat_map(|(address, balance_map)| {
1031                balance_map.iter().map(|(type_tag, balance)| {
1032                    (
1033                        (*address, type_tag.clone()),
1034                        Ok::<TotalBalance, SuiError>(*balance),
1035                    )
1036                })
1037            })
1038            .collect();
1039        let all_balance_changes: Vec<_> = balance_changes
1040            .into_iter()
1041            .map(|(address, balance_map)| {
1042                (
1043                    address,
1044                    Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
1045                )
1046            })
1047            .collect();
1048        let cache_updates = IndexStoreCacheUpdatesWithLocks {
1049            _locks,
1050            inner: IndexStoreCacheUpdates {
1051                per_coin_type_balance_changes,
1052                all_balance_changes,
1053            },
1054        };
1055        Ok(cache_updates)
1056    }
1057
1058    pub fn allocate_sequence_number(&self) -> u64 {
1059        self.next_sequence_number.fetch_add(1, Ordering::SeqCst)
1060    }
1061
1062    #[instrument(skip_all)]
1063    pub fn index_tx(
1064        &self,
1065        sequence: u64,
1066        sender: SuiAddress,
1067        active_inputs: impl Iterator<Item = ObjectID>,
1068        mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
1069        move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
1070        events: &TransactionEvents,
1071        object_index_changes: ObjectIndexChanges,
1072        digest: &TransactionDigest,
1073        timestamp_ms: u64,
1074        tx_coins: Option<TxCoins>,
1075        accumulator_events: Vec<AccumulatorEvent>,
1076        acquire_locks: bool,
1077    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
1078        let mut batch = StagedBatch::new();
1079
1080        batch.insert_batch(
1081            &self.tables.transaction_order,
1082            std::iter::once((sequence, *digest)),
1083        )?;
1084
1085        batch.insert_batch(
1086            &self.tables.transactions_seq,
1087            std::iter::once((*digest, sequence)),
1088        )?;
1089
1090        batch.insert_batch(
1091            &self.tables.transactions_from_addr,
1092            std::iter::once(((sender, sequence), *digest)),
1093        )?;
1094
1095        #[allow(deprecated)]
1096        if !self.remove_deprecated_tables {
1097            batch.insert_batch(
1098                &self.tables.transactions_by_input_object_id,
1099                active_inputs.map(|id| ((id, sequence), *digest)),
1100            )?;
1101
1102            batch.insert_batch(
1103                &self.tables.transactions_by_mutated_object_id,
1104                mutated_objects
1105                    .clone()
1106                    .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
1107            )?;
1108        }
1109
1110        batch.insert_batch(
1111            &self.tables.transactions_by_move_function,
1112            move_functions
1113                .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
1114        )?;
1115
1116        // objects sent to addresses and accumulator events
1117        let affected_addresses = mutated_objects
1118            .filter_map(|(_, owner)| {
1119                owner
1120                    .get_address_owner_address()
1121                    .ok()
1122                    .map(|addr| ((addr, sequence), digest))
1123            })
1124            .chain(
1125                accumulator_events
1126                    .iter()
1127                    .map(|event| ((event.write.address.address, sequence), digest)),
1128            );
1129        batch.insert_batch(&self.tables.transactions_to_addr, affected_addresses)?;
1130
1131        // Coin Index
1132        let cache_updates = self.index_coin(
1133            digest,
1134            &mut batch,
1135            &object_index_changes,
1136            tx_coins,
1137            acquire_locks,
1138        )?;
1139
1140        // update address balances index
1141        let address_balance_updates = accumulator_events.into_iter().filter_map(|event| {
1142            let ty = &event.write.address.ty;
1143            let coin_type = sui_types::balance::Balance::maybe_get_balance_type_param(ty)?;
1144            Some(((event.write.address.address, coin_type), ()))
1145        });
1146        batch.insert_batch(&self.tables.address_balances, address_balance_updates)?;
1147
1148        // Owner index
1149        batch.delete_batch(
1150            &self.tables.owner_index,
1151            object_index_changes.deleted_owners.into_iter(),
1152        )?;
1153        batch.delete_batch(
1154            &self.tables.dynamic_field_index,
1155            object_index_changes.deleted_dynamic_fields.into_iter(),
1156        )?;
1157
1158        batch.insert_batch(
1159            &self.tables.owner_index,
1160            object_index_changes.new_owners.into_iter(),
1161        )?;
1162
1163        batch.insert_batch(
1164            &self.tables.dynamic_field_index,
1165            object_index_changes.new_dynamic_fields.into_iter(),
1166        )?;
1167
1168        // events
1169        let event_digest = events.digest();
1170        batch.insert_batch(
1171            &self.tables.event_order,
1172            events
1173                .data
1174                .iter()
1175                .enumerate()
1176                .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
1177        )?;
1178        batch.insert_batch(
1179            &self.tables.event_by_move_module,
1180            events
1181                .data
1182                .iter()
1183                .enumerate()
1184                .map(|(i, e)| {
1185                    (
1186                        i,
1187                        ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
1188                    )
1189                })
1190                .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
1191        )?;
1192        batch.insert_batch(
1193            &self.tables.event_by_sender,
1194            events.data.iter().enumerate().map(|(i, e)| {
1195                (
1196                    (e.sender, (sequence, i)),
1197                    (event_digest, *digest, timestamp_ms),
1198                )
1199            }),
1200        )?;
1201        batch.insert_batch(
1202            &self.tables.event_by_move_event,
1203            events.data.iter().enumerate().map(|(i, e)| {
1204                (
1205                    (e.type_.clone(), (sequence, i)),
1206                    (event_digest, *digest, timestamp_ms),
1207                )
1208            }),
1209        )?;
1210
1211        batch.insert_batch(
1212            &self.tables.event_by_time,
1213            events.data.iter().enumerate().map(|(i, _)| {
1214                (
1215                    (timestamp_ms, (sequence, i)),
1216                    (event_digest, *digest, timestamp_ms),
1217                )
1218            }),
1219        )?;
1220
1221        batch.insert_batch(
1222            &self.tables.event_by_event_module,
1223            events.data.iter().enumerate().map(|(i, e)| {
1224                (
1225                    (
1226                        ModuleId::new(e.type_.address, e.type_.module.clone()),
1227                        (sequence, i),
1228                    ),
1229                    (event_digest, *digest, timestamp_ms),
1230                )
1231            }),
1232        )?;
1233
1234        Ok((batch, cache_updates))
1235    }
1236
1237    /// Write a combined index batch and apply cache updates.
1238    pub fn commit_index_batch(
1239        &self,
1240        batch: DBBatch,
1241        cache_updates: Vec<IndexStoreCacheUpdates>,
1242    ) -> SuiResult {
1243        let invalidate_caches =
1244            read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
1245
1246        if invalidate_caches {
1247            for cu in &cache_updates {
1248                self.invalidate_per_coin_type_cache(
1249                    cu.per_coin_type_balance_changes.iter().map(|x| x.0.clone()),
1250                )?;
1251                self.invalidate_all_balance_cache(cu.all_balance_changes.iter().map(|x| x.0))?;
1252            }
1253        }
1254
1255        batch.write()?;
1256
1257        if !invalidate_caches {
1258            for cu in cache_updates {
1259                self.update_per_coin_type_cache(cu.per_coin_type_balance_changes)?;
1260                self.update_all_balance_cache(cu.all_balance_changes)?;
1261            }
1262        }
1263        Ok(())
1264    }
1265
1266    pub fn new_db_batch(&self) -> DBBatch {
1267        self.tables.transactions_from_addr.batch()
1268    }
1269
1270    pub fn next_sequence_number(&self) -> TxSequenceNumber {
1271        self.next_sequence_number.load(Ordering::SeqCst) + 1
1272    }
1273
1274    #[instrument(skip(self))]
1275    pub fn get_transactions(
1276        &self,
1277        filter: Option<TransactionFilter>,
1278        cursor: Option<TransactionDigest>,
1279        limit: Option<usize>,
1280        reverse: bool,
1281    ) -> SuiResult<Vec<TransactionDigest>> {
1282        // Lookup TransactionDigest sequence number,
1283        let cursor = if let Some(cursor) = cursor {
1284            Some(
1285                self.get_transaction_seq(&cursor)?
1286                    .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
1287            )
1288        } else {
1289            None
1290        };
1291        match filter {
1292            Some(TransactionFilter::MoveFunction {
1293                package,
1294                module,
1295                function,
1296            }) => Ok(self.get_transactions_by_move_function(
1297                package, module, function, cursor, limit, reverse,
1298            )?),
1299            Some(TransactionFilter::InputObject(object_id)) => {
1300                Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
1301            }
1302            Some(TransactionFilter::ChangedObject(object_id)) => {
1303                Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
1304            }
1305            Some(TransactionFilter::FromAddress(address)) => {
1306                Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
1307            }
1308            Some(TransactionFilter::ToAddress(address)) => {
1309                Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
1310            }
1311            // NOTE: filter via checkpoint sequence number is implemented in
1312            // `get_transactions` of authority.rs.
1313            Some(_) => Err(SuiErrorKind::UserInputError {
1314                error: UserInputError::Unsupported(format!("{:?}", filter)),
1315            }
1316            .into()),
1317            None => {
1318                if reverse {
1319                    let iter = self
1320                        .tables
1321                        .transaction_order
1322                        .reversed_safe_iter_with_bounds(
1323                            None,
1324                            Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
1325                        )?
1326                        .skip(usize::from(cursor.is_some()))
1327                        .map(|result| result.map(|(_, digest)| digest));
1328                    if let Some(limit) = limit {
1329                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1330                    } else {
1331                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
1332                    }
1333                } else {
1334                    let iter = self
1335                        .tables
1336                        .transaction_order
1337                        .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
1338                        .skip(usize::from(cursor.is_some()))
1339                        .map(|result| result.map(|(_, digest)| digest));
1340                    if let Some(limit) = limit {
1341                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1342                    } else {
1343                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
1344                    }
1345                }
1346            }
1347        }
1348    }
1349
1350    #[instrument(skip_all)]
1351    fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
1352        index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
1353        key: KeyT,
1354        cursor: Option<TxSequenceNumber>,
1355        limit: Option<usize>,
1356        reverse: bool,
1357    ) -> SuiResult<Vec<TransactionDigest>> {
1358        Ok(if reverse {
1359            let iter = index
1360                .reversed_safe_iter_with_bounds(
1361                    None,
1362                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
1363                )?
1364                // skip one more if exclusive cursor is Some
1365                .skip(usize::from(cursor.is_some()))
1366                .take_while(|result| {
1367                    result
1368                        .as_ref()
1369                        .map(|((id, _), _)| *id == key)
1370                        .unwrap_or(false)
1371                })
1372                .map(|result| result.map(|(_, digest)| digest));
1373            if let Some(limit) = limit {
1374                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1375            } else {
1376                iter.collect::<Result<Vec<_>, _>>()?
1377            }
1378        } else {
1379            let iter = index
1380                .safe_iter_with_bounds(
1381                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1382                    None,
1383                )
1384                // skip one more if exclusive cursor is Some
1385                .skip(usize::from(cursor.is_some()))
1386                .map(|result| result.expect("iterator db error"))
1387                .take_while(|((id, _), _)| *id == key)
1388                .map(|(_, digest)| digest);
1389            if let Some(limit) = limit {
1390                iter.take(limit).collect()
1391            } else {
1392                iter.collect()
1393            }
1394        })
1395    }
1396
1397    #[instrument(skip(self))]
1398    pub fn get_transactions_by_input_object(
1399        &self,
1400        input_object: ObjectID,
1401        cursor: Option<TxSequenceNumber>,
1402        limit: Option<usize>,
1403        reverse: bool,
1404    ) -> SuiResult<Vec<TransactionDigest>> {
1405        if self.remove_deprecated_tables {
1406            return Ok(vec![]);
1407        }
1408        #[allow(deprecated)]
1409        Self::get_transactions_from_index(
1410            &self.tables.transactions_by_input_object_id,
1411            input_object,
1412            cursor,
1413            limit,
1414            reverse,
1415        )
1416    }
1417
1418    #[instrument(skip(self))]
1419    pub fn get_transactions_by_mutated_object(
1420        &self,
1421        mutated_object: ObjectID,
1422        cursor: Option<TxSequenceNumber>,
1423        limit: Option<usize>,
1424        reverse: bool,
1425    ) -> SuiResult<Vec<TransactionDigest>> {
1426        if self.remove_deprecated_tables {
1427            return Ok(vec![]);
1428        }
1429        #[allow(deprecated)]
1430        Self::get_transactions_from_index(
1431            &self.tables.transactions_by_mutated_object_id,
1432            mutated_object,
1433            cursor,
1434            limit,
1435            reverse,
1436        )
1437    }
1438
1439    #[instrument(skip(self))]
1440    pub fn get_transactions_from_addr(
1441        &self,
1442        addr: SuiAddress,
1443        cursor: Option<TxSequenceNumber>,
1444        limit: Option<usize>,
1445        reverse: bool,
1446    ) -> SuiResult<Vec<TransactionDigest>> {
1447        Self::get_transactions_from_index(
1448            &self.tables.transactions_from_addr,
1449            addr,
1450            cursor,
1451            limit,
1452            reverse,
1453        )
1454    }
1455
1456    #[instrument(skip(self))]
1457    pub fn get_transactions_by_move_function(
1458        &self,
1459        package: ObjectID,
1460        module: Option<String>,
1461        function: Option<String>,
1462        cursor: Option<TxSequenceNumber>,
1463        limit: Option<usize>,
1464        reverse: bool,
1465    ) -> SuiResult<Vec<TransactionDigest>> {
1466        // If we are passed a function with no module return a UserInputError
1467        if function.is_some() && module.is_none() {
1468            return Err(SuiErrorKind::UserInputError {
1469                error: UserInputError::MoveFunctionInputError(
1470                    "Cannot supply function without supplying module".to_string(),
1471                ),
1472            }
1473            .into());
1474        }
1475
1476        // We cannot have a cursor without filling out the other keys.
1477        if cursor.is_some() && (module.is_none() || function.is_none()) {
1478            return Err(SuiErrorKind::UserInputError {
1479                error: UserInputError::MoveFunctionInputError(
1480                    "Cannot supply cursor without supplying module and function".to_string(),
1481                ),
1482            }
1483            .into());
1484        }
1485
1486        let cursor_val = cursor.unwrap_or(if reverse {
1487            TxSequenceNumber::MAX
1488        } else {
1489            TxSequenceNumber::MIN
1490        });
1491
1492        let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1493        let module_val = module.clone().unwrap_or(if reverse {
1494            max_string.clone()
1495        } else {
1496            "".to_string()
1497        });
1498
1499        let function_val =
1500            function
1501                .clone()
1502                .unwrap_or(if reverse { max_string } else { "".to_string() });
1503
1504        let key = (package, module_val, function_val, cursor_val);
1505        Ok(if reverse {
1506            let iter = self
1507                .tables
1508                .transactions_by_move_function
1509                .reversed_safe_iter_with_bounds(None, Some(key))?
1510                // skip one more if exclusive cursor is Some
1511                .skip(usize::from(cursor.is_some()))
1512                .take_while(|result| {
1513                    result
1514                        .as_ref()
1515                        .map(|((id, m, f, _), _)| {
1516                            *id == package
1517                                && module.as_ref().map(|x| x == m).unwrap_or(true)
1518                                && function.as_ref().map(|x| x == f).unwrap_or(true)
1519                        })
1520                        .unwrap_or(false)
1521                })
1522                .map(|result| result.map(|(_, digest)| digest));
1523            if let Some(limit) = limit {
1524                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1525            } else {
1526                iter.collect::<Result<Vec<_>, _>>()?
1527            }
1528        } else {
1529            let iter = self
1530                .tables
1531                .transactions_by_move_function
1532                .safe_iter_with_bounds(Some(key), None)
1533                .map(|result| result.expect("iterator db error"))
1534                // skip one more if exclusive cursor is Some
1535                .skip(usize::from(cursor.is_some()))
1536                .take_while(|((id, m, f, _), _)| {
1537                    *id == package
1538                        && module.as_ref().map(|x| x == m).unwrap_or(true)
1539                        && function.as_ref().map(|x| x == f).unwrap_or(true)
1540                })
1541                .map(|(_, digest)| digest);
1542            if let Some(limit) = limit {
1543                iter.take(limit).collect()
1544            } else {
1545                iter.collect()
1546            }
1547        })
1548    }
1549
1550    #[instrument(skip(self))]
1551    pub fn get_transactions_to_addr(
1552        &self,
1553        addr: SuiAddress,
1554        cursor: Option<TxSequenceNumber>,
1555        limit: Option<usize>,
1556        reverse: bool,
1557    ) -> SuiResult<Vec<TransactionDigest>> {
1558        Self::get_transactions_from_index(
1559            &self.tables.transactions_to_addr,
1560            addr,
1561            cursor,
1562            limit,
1563            reverse,
1564        )
1565    }
1566
1567    #[instrument(skip(self))]
1568    pub fn get_transaction_seq(
1569        &self,
1570        digest: &TransactionDigest,
1571    ) -> SuiResult<Option<TxSequenceNumber>> {
1572        Ok(self.tables.transactions_seq.get(digest)?)
1573    }
1574
1575    #[instrument(skip(self))]
1576    pub fn all_events(
1577        &self,
1578        tx_seq: TxSequenceNumber,
1579        event_seq: usize,
1580        limit: usize,
1581        descending: bool,
1582    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1583        Ok(if descending {
1584            self.tables
1585                .event_order
1586                .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1587                .take(limit)
1588                .map(|result| {
1589                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1590                        (digest, tx_digest, event_seq, time)
1591                    })
1592                })
1593                .collect::<Result<Vec<_>, _>>()?
1594        } else {
1595            self.tables
1596                .event_order
1597                .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1598                .take(limit)
1599                .map(|result| {
1600                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1601                        (digest, tx_digest, event_seq, time)
1602                    })
1603                })
1604                .collect::<Result<Vec<_>, _>>()?
1605        })
1606    }
1607
1608    #[instrument(skip(self))]
1609    pub fn events_by_transaction(
1610        &self,
1611        digest: &TransactionDigest,
1612        tx_seq: TxSequenceNumber,
1613        event_seq: usize,
1614        limit: usize,
1615        descending: bool,
1616    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1617        let seq = self
1618            .get_transaction_seq(digest)?
1619            .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1620        Ok(if descending {
1621            self.tables
1622                .event_order
1623                .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1624                .take_while(|result| {
1625                    result
1626                        .as_ref()
1627                        .map(|((tx, _), _)| tx == &seq)
1628                        .unwrap_or(false)
1629                })
1630                .take(limit)
1631                .map(|result| {
1632                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1633                        (digest, tx_digest, event_seq, time)
1634                    })
1635                })
1636                .collect::<Result<Vec<_>, _>>()?
1637        } else {
1638            self.tables
1639                .event_order
1640                .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1641                .map(|result| result.expect("iterator db error"))
1642                .take_while(|((tx, _), _)| tx == &seq)
1643                .take(limit)
1644                .map(|((_, event_seq), (digest, tx_digest, time))| {
1645                    (digest, tx_digest, event_seq, time)
1646                })
1647                .collect()
1648        })
1649    }
1650
1651    #[instrument(skip_all)]
1652    fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1653        index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1654        key: &KeyT,
1655        tx_seq: TxSequenceNumber,
1656        event_seq: usize,
1657        limit: usize,
1658        descending: bool,
1659    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1660        Ok(if descending {
1661            index
1662                .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1663                .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1664                .take(limit)
1665                .map(|result| {
1666                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1667                        (digest, tx_digest, event_seq, time)
1668                    })
1669                })
1670                .collect::<Result<Vec<_>, _>>()?
1671        } else {
1672            index
1673                .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1674                .map(|result| result.expect("iterator db error"))
1675                .take_while(|((m, _), _)| m == key)
1676                .take(limit)
1677                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1678                    (digest, tx_digest, event_seq, time)
1679                })
1680                .collect()
1681        })
1682    }
1683
1684    #[instrument(skip(self))]
1685    pub fn events_by_module_id(
1686        &self,
1687        module: &ModuleId,
1688        tx_seq: TxSequenceNumber,
1689        event_seq: usize,
1690        limit: usize,
1691        descending: bool,
1692    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1693        Self::get_event_from_index(
1694            &self.tables.event_by_move_module,
1695            module,
1696            tx_seq,
1697            event_seq,
1698            limit,
1699            descending,
1700        )
1701    }
1702
1703    #[instrument(skip(self))]
1704    pub fn events_by_move_event_struct_name(
1705        &self,
1706        struct_name: &StructTag,
1707        tx_seq: TxSequenceNumber,
1708        event_seq: usize,
1709        limit: usize,
1710        descending: bool,
1711    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1712        Self::get_event_from_index(
1713            &self.tables.event_by_move_event,
1714            struct_name,
1715            tx_seq,
1716            event_seq,
1717            limit,
1718            descending,
1719        )
1720    }
1721
1722    #[instrument(skip(self))]
1723    pub fn events_by_move_event_module(
1724        &self,
1725        module_id: &ModuleId,
1726        tx_seq: TxSequenceNumber,
1727        event_seq: usize,
1728        limit: usize,
1729        descending: bool,
1730    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1731        Self::get_event_from_index(
1732            &self.tables.event_by_event_module,
1733            module_id,
1734            tx_seq,
1735            event_seq,
1736            limit,
1737            descending,
1738        )
1739    }
1740
1741    #[instrument(skip(self))]
1742    pub fn events_by_sender(
1743        &self,
1744        sender: &SuiAddress,
1745        tx_seq: TxSequenceNumber,
1746        event_seq: usize,
1747        limit: usize,
1748        descending: bool,
1749    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1750        Self::get_event_from_index(
1751            &self.tables.event_by_sender,
1752            sender,
1753            tx_seq,
1754            event_seq,
1755            limit,
1756            descending,
1757        )
1758    }
1759
1760    #[instrument(skip(self))]
1761    pub fn event_iterator(
1762        &self,
1763        start_time: u64,
1764        end_time: u64,
1765        tx_seq: TxSequenceNumber,
1766        event_seq: usize,
1767        limit: usize,
1768        descending: bool,
1769    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1770        Ok(if descending {
1771            self.tables
1772                .event_by_time
1773                .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1774                .take_while(|result| {
1775                    result
1776                        .as_ref()
1777                        .map(|((m, _), _)| m >= &start_time)
1778                        .unwrap_or(false)
1779                })
1780                .take(limit)
1781                .map(|result| {
1782                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1783                        (digest, tx_digest, event_seq, time)
1784                    })
1785                })
1786                .collect::<Result<Vec<_>, _>>()?
1787        } else {
1788            self.tables
1789                .event_by_time
1790                .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1791                .map(|result| result.expect("iterator db error"))
1792                .take_while(|((m, _), _)| m <= &end_time)
1793                .take(limit)
1794                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1795                    (digest, tx_digest, event_seq, time)
1796                })
1797                .collect()
1798        })
1799    }
1800
1801    pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1802        match self
1803            .tables
1804            .event_by_time
1805            .reversed_safe_iter_with_bounds(
1806                None,
1807                Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1808            )?
1809            .next()
1810            .transpose()?
1811        {
1812            Some(((_, (watermark, _)), _)) => {
1813                if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1814                    info!(
1815                        "json rpc index pruning. Watermark is {} with digest {}",
1816                        watermark, digest
1817                    );
1818                }
1819                self.pruner_watermark.store(watermark, Ordering::Relaxed);
1820                self.tables.pruner_watermark.insert(&(), &watermark)?;
1821                Ok(watermark)
1822            }
1823            None => Ok(0),
1824        }
1825    }
1826
1827    pub fn get_dynamic_fields_iterator(
1828        &self,
1829        object: ObjectID,
1830        cursor: Option<ObjectID>,
1831    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1832    {
1833        Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1834    }
1835
1836    #[instrument(skip(self))]
1837    pub fn get_dynamic_field_object_id(
1838        &self,
1839        object: ObjectID,
1840        name_type: TypeTag,
1841        name_bcs_bytes: &[u8],
1842    ) -> SuiResult<Option<ObjectID>> {
1843        debug!(?object, "get_dynamic_field_object_id");
1844        let dynamic_field_id =
1845            dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1846                |e| {
1847                    SuiErrorKind::Unknown(format!(
1848                        "Unable to generate dynamic field id. Got error: {e:?}"
1849                    ))
1850                },
1851            )?;
1852
1853        if let Some(info) = self
1854            .tables
1855            .dynamic_field_index
1856            .get(&(object, dynamic_field_id))?
1857        {
1858            // info.object_id != dynamic_field_id ==> is_wrapper
1859            debug_assert!(
1860                info.object_id == dynamic_field_id
1861                    || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1862            );
1863            return Ok(Some(info.object_id));
1864        }
1865
1866        let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1867        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1868        let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1869            object,
1870            &dynamic_object_field_type,
1871            name_bcs_bytes,
1872        )
1873        .map_err(|e| {
1874            SuiErrorKind::Unknown(format!(
1875                "Unable to generate dynamic field id. Got error: {e:?}"
1876            ))
1877        })?;
1878        if let Some(info) = self
1879            .tables
1880            .dynamic_field_index
1881            .get(&(object, dynamic_object_field_id))?
1882        {
1883            return Ok(Some(info.object_id));
1884        }
1885
1886        Ok(None)
1887    }
1888
1889    #[instrument(skip(self))]
1890    pub fn get_owner_objects(
1891        &self,
1892        owner: SuiAddress,
1893        cursor: Option<ObjectID>,
1894        limit: usize,
1895        filter: Option<SuiObjectDataFilter>,
1896    ) -> SuiResult<Vec<ObjectInfo>> {
1897        let cursor = match cursor {
1898            Some(cursor) => cursor,
1899            None => ObjectID::ZERO,
1900        };
1901        Ok(self
1902            .get_owner_objects_iterator(owner, cursor, filter)?
1903            .take(limit)
1904            .collect())
1905    }
1906
1907    pub fn get_address_balance_coin_types_iter(
1908        &self,
1909        owner: SuiAddress,
1910    ) -> impl Iterator<Item = TypeTag> {
1911        let start_key = (owner, TypeTag::Bool);
1912        self.tables()
1913            .address_balances
1914            .safe_iter_with_bounds(Some(start_key), None)
1915            .map(|result| result.expect("iterator db error"))
1916            .take_while(move |(key, _)| key.0 == owner)
1917            .map(|(key, _)| key.1)
1918    }
1919
1920    pub fn get_owned_coins_iterator(
1921        coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1922        owner: SuiAddress,
1923        coin_type_tag: Option<String>,
1924    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1925        let all_coins = coin_type_tag.is_none();
1926        let starting_coin_type =
1927            coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1928        let start_key =
1929            CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1930        Ok(coin_index
1931            .safe_iter_with_bounds(Some(start_key), None)
1932            .map(|result| result.expect("iterator db error"))
1933            .take_while(move |(key, _)| {
1934                if key.owner != owner {
1935                    return false;
1936                }
1937                if !all_coins && starting_coin_type != key.coin_type {
1938                    return false;
1939                }
1940                true
1941            }))
1942    }
1943
1944    pub fn get_owned_coins_iterator_with_cursor(
1945        &self,
1946        owner: SuiAddress,
1947        cursor: (String, u64, ObjectID),
1948        limit: usize,
1949        one_coin_type_only: bool,
1950    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1951        let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1952        let start_key = CoinIndexKey2::new_from_cursor(
1953            owner,
1954            starting_coin_type.clone(),
1955            inverted_balance,
1956            starting_object_id,
1957        );
1958        Ok(self
1959            .tables
1960            .coin_index_2
1961            .safe_iter_with_bounds(Some(start_key), None)
1962            .map(|result| result.expect("iterator db error"))
1963            .filter(move |(key, _)| key.object_id != starting_object_id)
1964            .enumerate()
1965            .take_while(move |(index, (key, _))| {
1966                if *index >= limit {
1967                    return false;
1968                }
1969                if key.owner != owner {
1970                    return false;
1971                }
1972                if one_coin_type_only && starting_coin_type != key.coin_type {
1973                    return false;
1974                }
1975                true
1976            })
1977            .map(|(_index, (key, info))| (key, info)))
1978    }
1979
1980    /// starting_object_id can be used to implement pagination, where a client remembers the last
1981    /// object id of each page, and use it to query the next page.
1982    pub fn get_owner_objects_iterator(
1983        &self,
1984        owner: SuiAddress,
1985        starting_object_id: ObjectID,
1986        filter: Option<SuiObjectDataFilter>,
1987    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1988        Ok(self
1989            .tables
1990            .owner_index
1991            // The object id 0 is the smallest possible
1992            .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1993            .map(|result| result.expect("iterator db error"))
1994            .skip(usize::from(starting_object_id != ObjectID::ZERO))
1995            .take_while(move |((address_owner, _), _)| address_owner == &owner)
1996            .filter(move |(_, o)| {
1997                if let Some(filter) = filter.as_ref() {
1998                    filter.matches(o)
1999                } else {
2000                    true
2001                }
2002            })
2003            .map(|(_, object_info)| object_info))
2004    }
2005
2006    pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
2007        let mut batch = self.tables.owner_index.batch();
2008        batch.insert_batch(
2009            &self.tables.owner_index,
2010            object_index_changes.new_owners.into_iter(),
2011        )?;
2012        batch.insert_batch(
2013            &self.tables.dynamic_field_index,
2014            object_index_changes.new_dynamic_fields.into_iter(),
2015        )?;
2016        batch.write()?;
2017        Ok(())
2018    }
2019
2020    pub fn is_empty(&self) -> bool {
2021        self.tables.owner_index.is_empty()
2022    }
2023
2024    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
2025        // We are checkpointing the whole db
2026        self.tables
2027            .transactions_from_addr
2028            .checkpoint_db(path)
2029            .map_err(Into::into)
2030    }
2031
2032    /// This method first gets the balance from `per_coin_type_balance` cache. On a cache miss, it
2033    /// gets the balance for passed in `coin_type` from the `all_balance` cache. Only on the second
2034    /// cache miss, we go to the database (expensive) and update the cache. Notice that db read is
2035    /// done with `spawn_blocking` as that is expected to block
2036    #[instrument(skip(self))]
2037    pub fn get_coin_object_balance(
2038        &self,
2039        owner: SuiAddress,
2040        coin_type: TypeTag,
2041    ) -> SuiResult<TotalBalance> {
2042        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2043        let cloned_coin_type = coin_type.clone();
2044        let metrics_cloned = self.metrics.clone();
2045        let coin_index_cloned = self.tables.coin_index_2.clone();
2046        if force_disable_cache {
2047            return Self::get_balance_from_db(
2048                metrics_cloned,
2049                coin_index_cloned,
2050                owner,
2051                cloned_coin_type,
2052            )
2053            .map_err(|e| {
2054                SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2055                    .into()
2056            });
2057        }
2058
2059        self.metrics.balance_lookup_from_total.inc();
2060
2061        let balance = self
2062            .caches
2063            .per_coin_type_balance
2064            .get(&(owner, coin_type.clone()));
2065        if let Some(balance) = balance {
2066            return balance;
2067        }
2068        // cache miss, lookup in all balance cache
2069        let all_balance = self.caches.all_balances.get(&owner.clone());
2070        if let Some(Ok(all_balance)) = all_balance
2071            && let Some(balance) = all_balance.get(&coin_type)
2072        {
2073            return Ok(*balance);
2074        }
2075        let cloned_coin_type = coin_type.clone();
2076        let metrics_cloned = self.metrics.clone();
2077        let coin_index_cloned = self.tables.coin_index_2.clone();
2078        self.caches
2079            .per_coin_type_balance
2080            .get_with((owner, coin_type), move || {
2081                Self::get_balance_from_db(
2082                    metrics_cloned,
2083                    coin_index_cloned,
2084                    owner,
2085                    cloned_coin_type,
2086                )
2087                .map_err(|e| {
2088                    SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2089                        .into()
2090                })
2091            })
2092    }
2093
2094    /// This method gets the balance for all coin types from the `all_balance` cache. On a cache miss,
2095    /// we go to the database (expensive) and update the cache. This cache is dual purpose in the
2096    /// sense that it not only serves `get_AllBalance()` calls but is also used for serving
2097    /// `get_Balance()` queries. Notice that db read is performed with `spawn_blocking` as that is
2098    /// expected to block
2099    #[instrument(skip(self))]
2100    pub fn get_all_coin_object_balances(
2101        &self,
2102        owner: SuiAddress,
2103    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2104        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2105        let metrics_cloned = self.metrics.clone();
2106        let coin_index_cloned = self.tables.coin_index_2.clone();
2107        if force_disable_cache {
2108            return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
2109                .map_err(|e| {
2110                    SuiErrorKind::ExecutionError(format!(
2111                        "Failed to read all balance from DB: {:?}",
2112                        e
2113                    ))
2114                    .into()
2115                });
2116        }
2117
2118        self.metrics.all_balance_lookup_from_total.inc();
2119        let metrics_cloned = self.metrics.clone();
2120        let coin_index_cloned = self.tables.coin_index_2.clone();
2121        self.caches.all_balances.get_with(owner, move || {
2122            Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
2123                SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
2124                    .into()
2125            })
2126        })
2127    }
2128
2129    /// Read balance for a `SuiAddress` and `CoinType` from the backend database
2130    #[instrument(skip_all)]
2131    pub fn get_balance_from_db(
2132        metrics: Arc<IndexStoreMetrics>,
2133        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2134        owner: SuiAddress,
2135        coin_type: TypeTag,
2136    ) -> SuiResult<TotalBalance> {
2137        metrics.balance_lookup_from_db.inc();
2138        let coin_type_str = coin_type.to_string();
2139        let coins =
2140            Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
2141
2142        let mut balance = 0i128;
2143        let mut num_coins = 0;
2144        for (_key, coin_info) in coins {
2145            balance += coin_info.balance as i128;
2146            num_coins += 1;
2147        }
2148        Ok(TotalBalance {
2149            balance,
2150            num_coins,
2151            address_balance: 0,
2152        })
2153    }
2154
2155    /// Read all balances for a `SuiAddress` from the backend database
2156    #[instrument(skip_all)]
2157    pub fn get_all_balances_from_db(
2158        metrics: Arc<IndexStoreMetrics>,
2159        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2160        owner: SuiAddress,
2161    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2162        metrics.all_balance_lookup_from_db.inc();
2163        let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
2164        let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
2165            .chunk_by(|(key, _coin)| key.coin_type.clone());
2166        for (coin_type, coins) in &coins {
2167            let mut total_balance = 0i128;
2168            let mut coin_object_count = 0;
2169            for (_, coin_info) in coins {
2170                total_balance += coin_info.balance as i128;
2171                coin_object_count += 1;
2172            }
2173            let coin_type =
2174                TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
2175                    SuiErrorKind::ExecutionError(format!(
2176                        "Failed to parse event sender address: {:?}",
2177                        e
2178                    ))
2179                })?));
2180            balances.insert(
2181                coin_type,
2182                TotalBalance {
2183                    num_coins: coin_object_count,
2184                    balance: total_balance,
2185                    address_balance: 0,
2186                },
2187            );
2188        }
2189        Ok(Arc::new(balances))
2190    }
2191
2192    fn invalidate_per_coin_type_cache(
2193        &self,
2194        keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
2195    ) -> SuiResult {
2196        self.caches.per_coin_type_balance.batch_invalidate(keys);
2197        Ok(())
2198    }
2199
2200    fn invalidate_all_balance_cache(
2201        &self,
2202        addresses: impl IntoIterator<Item = SuiAddress>,
2203    ) -> SuiResult {
2204        self.caches.all_balances.batch_invalidate(addresses);
2205        Ok(())
2206    }
2207
2208    fn update_per_coin_type_cache(
2209        &self,
2210        keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
2211    ) -> SuiResult {
2212        self.caches
2213            .per_coin_type_balance
2214            .batch_merge(keys, Self::merge_balance);
2215        Ok(())
2216    }
2217
2218    fn merge_balance(
2219        old_balance: &SuiResult<TotalBalance>,
2220        balance_delta: &SuiResult<TotalBalance>,
2221    ) -> SuiResult<TotalBalance> {
2222        if let Ok(old_balance) = old_balance {
2223            if let Ok(balance_delta) = balance_delta {
2224                Ok(TotalBalance {
2225                    balance: old_balance.balance + balance_delta.balance,
2226                    num_coins: old_balance.num_coins + balance_delta.num_coins,
2227                    address_balance: old_balance.address_balance,
2228                })
2229            } else {
2230                balance_delta.clone()
2231            }
2232        } else {
2233            old_balance.clone()
2234        }
2235    }
2236
2237    fn update_all_balance_cache(
2238        &self,
2239        keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
2240    ) -> SuiResult {
2241        self.caches
2242            .all_balances
2243            .batch_merge(keys, Self::merge_all_balance);
2244        Ok(())
2245    }
2246
2247    fn merge_all_balance(
2248        old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2249        balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2250    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2251        if let Ok(old_balance) = old_balance {
2252            if let Ok(balance_delta) = balance_delta {
2253                let mut new_balance = HashMap::new();
2254                for (key, value) in old_balance.iter() {
2255                    new_balance.insert(key.clone(), *value);
2256                }
2257                for (key, delta) in balance_delta.iter() {
2258                    let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
2259                        balance: 0,
2260                        num_coins: 0,
2261                        address_balance: 0,
2262                    });
2263                    let new_total = TotalBalance {
2264                        balance: old.balance + delta.balance,
2265                        num_coins: old.num_coins + delta.num_coins,
2266                        address_balance: old.address_balance,
2267                    };
2268                    new_balance.insert(key.clone(), new_total);
2269                }
2270                Ok(Arc::new(new_balance))
2271            } else {
2272                balance_delta.clone()
2273            }
2274        } else {
2275            old_balance.clone()
2276        }
2277    }
2278}
2279
2280#[cfg(test)]
2281mod tests {
2282    use super::IndexStore;
2283    use super::ObjectIndexChanges;
2284    use move_core_types::account_address::AccountAddress;
2285    use prometheus::Registry;
2286    use std::collections::BTreeMap;
2287    use std::env::temp_dir;
2288    use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
2289    use sui_types::digests::TransactionDigest;
2290    use sui_types::effects::TransactionEvents;
2291    use sui_types::gas_coin::GAS;
2292    use sui_types::object;
2293    use sui_types::object::Owner;
2294
2295    #[tokio::test]
2296    async fn test_index_cache() -> anyhow::Result<()> {
2297        // This test is going to invoke `index_tx()`where 10 coins each with balance 100
2298        // are going to be added to an address. The balance is then going to be read from the db
2299        // and the cache. It should be 1000. Then, we are going to delete 3 of those coins from
2300        // the address and invoke `index_tx()` again and read balance. The balance should be 700
2301        // and verified from both db and cache.
2302        // This tests make sure we are invalidating entries in the cache and always reading latest
2303        // balance.
2304        let index_store =
2305            IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
2306        let address: SuiAddress = AccountAddress::random().into();
2307        let mut written_objects = BTreeMap::new();
2308        let mut input_objects = BTreeMap::new();
2309        let mut object_map = BTreeMap::new();
2310
2311        let mut new_objects = vec![];
2312        for _i in 0..10 {
2313            let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
2314            new_objects.push((
2315                (address, object.id()),
2316                ObjectInfo {
2317                    object_id: object.id(),
2318                    version: object.version(),
2319                    digest: object.digest(),
2320                    type_: ObjectType::Struct(object.type_().unwrap().clone()),
2321                    owner: Owner::AddressOwner(address),
2322                    previous_transaction: object.previous_transaction,
2323                },
2324            ));
2325            object_map.insert(object.id(), object.clone());
2326            written_objects.insert(object.data.id(), object);
2327        }
2328        let object_index_changes = ObjectIndexChanges {
2329            deleted_owners: vec![],
2330            deleted_dynamic_fields: vec![],
2331            new_owners: new_objects,
2332            new_dynamic_fields: vec![],
2333        };
2334
2335        let tx_coins = (input_objects.clone(), written_objects.clone());
2336        let seq = index_store.allocate_sequence_number();
2337        let (raw_batch, cache_updates) = index_store.index_tx(
2338            seq,
2339            address,
2340            vec![].into_iter(),
2341            vec![].into_iter(),
2342            vec![].into_iter(),
2343            &TransactionEvents { data: vec![] },
2344            object_index_changes,
2345            &TransactionDigest::random(),
2346            1234,
2347            Some(tx_coins),
2348            vec![],
2349            false,
2350        )?;
2351        // Commit the batch so subsequent reads see the data
2352        let mut db_batch = index_store.new_db_batch();
2353        db_batch.concat(vec![raw_batch]).unwrap();
2354        index_store
2355            .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2356            .unwrap();
2357
2358        let balance_from_db = IndexStore::get_balance_from_db(
2359            index_store.metrics.clone(),
2360            index_store.tables.coin_index_2.clone(),
2361            address,
2362            GAS::type_tag(),
2363        )?;
2364        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2365        assert_eq!(balance, balance_from_db);
2366        assert_eq!(balance.balance, 1000);
2367        assert_eq!(balance.num_coins, 10);
2368
2369        let all_balance = index_store.get_all_coin_object_balances(address)?;
2370        let balance = all_balance.get(&GAS::type_tag()).unwrap();
2371        assert_eq!(*balance, balance_from_db);
2372        assert_eq!(balance.balance, 1000);
2373        assert_eq!(balance.num_coins, 10);
2374
2375        written_objects.clear();
2376        let mut deleted_objects = vec![];
2377        for (id, object) in object_map.iter().take(3) {
2378            deleted_objects.push((address, *id));
2379            input_objects.insert(*id, object.to_owned());
2380        }
2381        let object_index_changes = ObjectIndexChanges {
2382            deleted_owners: deleted_objects.clone(),
2383            deleted_dynamic_fields: vec![],
2384            new_owners: vec![],
2385            new_dynamic_fields: vec![],
2386        };
2387        let tx_coins = (input_objects, written_objects);
2388        let seq = index_store.allocate_sequence_number();
2389        let (raw_batch, cache_updates) = index_store.index_tx(
2390            seq,
2391            address,
2392            vec![].into_iter(),
2393            vec![].into_iter(),
2394            vec![].into_iter(),
2395            &TransactionEvents { data: vec![] },
2396            object_index_changes,
2397            &TransactionDigest::random(),
2398            1234,
2399            Some(tx_coins),
2400            vec![],
2401            false,
2402        )?;
2403        let mut db_batch = index_store.new_db_batch();
2404        db_batch.concat(vec![raw_batch]).unwrap();
2405        index_store
2406            .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2407            .unwrap();
2408        let balance_from_db = IndexStore::get_balance_from_db(
2409            index_store.metrics.clone(),
2410            index_store.tables.coin_index_2.clone(),
2411            address,
2412            GAS::type_tag(),
2413        )?;
2414        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2415        assert_eq!(balance, balance_from_db);
2416        assert_eq!(balance.balance, 700);
2417        assert_eq!(balance.num_coins, 7);
2418        // Invalidate per coin type balance cache and read from all balance cache to ensure
2419        // the balance matches
2420        index_store
2421            .caches
2422            .per_coin_type_balance
2423            .invalidate(&(address, GAS::type_tag()));
2424        let all_balance = index_store.get_all_coin_object_balances(address)?;
2425        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2426        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2427        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2428        assert_eq!(balance, balance_from_db);
2429        assert_eq!(balance.balance, 700);
2430        assert_eq!(balance.num_coins, 7);
2431
2432        Ok(())
2433    }
2434
2435    #[tokio::test]
2436    async fn test_get_transaction_by_move_function() {
2437        use sui_types::base_types::ObjectID;
2438        use typed_store::Map;
2439
2440        let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2441        let db = &index_store.tables.transactions_by_move_function;
2442        db.insert(
2443            &(
2444                ObjectID::new([1; 32]),
2445                "mod".to_string(),
2446                "f".to_string(),
2447                0,
2448            ),
2449            &[0; 32].into(),
2450        )
2451        .unwrap();
2452        db.insert(
2453            &(
2454                ObjectID::new([1; 32]),
2455                "mod".to_string(),
2456                "Z".repeat(128),
2457                0,
2458            ),
2459            &[1; 32].into(),
2460        )
2461        .unwrap();
2462        db.insert(
2463            &(
2464                ObjectID::new([1; 32]),
2465                "mod".to_string(),
2466                "f".repeat(128),
2467                0,
2468            ),
2469            &[2; 32].into(),
2470        )
2471        .unwrap();
2472        db.insert(
2473            &(
2474                ObjectID::new([1; 32]),
2475                "mod".to_string(),
2476                "z".repeat(128),
2477                0,
2478            ),
2479            &[3; 32].into(),
2480        )
2481        .unwrap();
2482
2483        let mut v = index_store
2484            .get_transactions_by_move_function(
2485                ObjectID::new([1; 32]),
2486                Some("mod".to_string()),
2487                None,
2488                None,
2489                None,
2490                false,
2491            )
2492            .unwrap();
2493        let v_rev = index_store
2494            .get_transactions_by_move_function(
2495                ObjectID::new([1; 32]),
2496                Some("mod".to_string()),
2497                None,
2498                None,
2499                None,
2500                true,
2501            )
2502            .unwrap();
2503        v.reverse();
2504        assert_eq!(v, v_rev);
2505    }
2506}