sui_core/
jsonrpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! IndexStore supports creation of various ancillary indexes of state in SuiDataStore.
5//! The main user of this data is the explorer.
6
7use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18    IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19    register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use sui_types::accumulator_event::AccumulatorEvent;
23use typed_store::TypedStoreError;
24use typed_store::rocksdb::compaction_filter::Decision;
25
26use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
27use sui_storage::mutex_table::MutexTable;
28use sui_storage::sharded_lru::ShardedLruCache;
29use sui_types::base_types::{
30    ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
31};
32use sui_types::base_types::{ObjectInfo, ObjectRef};
33use sui_types::digests::TransactionEventsDigest;
34use sui_types::dynamic_field::{self, DynamicFieldInfo};
35use sui_types::effects::TransactionEvents;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
37use sui_types::inner_temporary_store::TxCoins;
38use sui_types::object::{Object, Owner};
39use sui_types::parse_sui_struct_tag;
40use sui_types::storage::error::Error as StorageError;
41use tracing::{debug, info, instrument, trace};
42use typed_store::DBMapUtils;
43use typed_store::rocks::{
44    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
45    read_size_from_env,
46};
47use typed_store::traits::Map;
48
49type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
50
51type OwnerIndexKey = (SuiAddress, ObjectID);
52type DynamicFieldKey = (ObjectID, ObjectID);
53type EventId = (TxSequenceNumber, usize);
54type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
55type AllBalance = HashMap<TypeTag, TotalBalance>;
56
57#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
58pub struct CoinIndexKey2 {
59    pub owner: SuiAddress,
60    pub coin_type: String,
61    // the balance of the coin inverted `!coin.balance` in order to force sorting of coins to be
62    // from greatest to least
63    pub inverted_balance: u64,
64    pub object_id: ObjectID,
65}
66
67impl CoinIndexKey2 {
68    pub fn new_from_cursor(
69        owner: SuiAddress,
70        coin_type: String,
71        inverted_balance: u64,
72        object_id: ObjectID,
73    ) -> Self {
74        Self {
75            owner,
76            coin_type,
77            inverted_balance,
78            object_id,
79        }
80    }
81
82    pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
83        Self {
84            owner,
85            coin_type,
86            inverted_balance: !balance,
87            object_id,
88        }
89    }
90}
91
92const CURRENT_DB_VERSION: u64 = 0;
93const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
96struct MetadataInfo {
97    /// Version of the Database
98    version: u64,
99    /// Version of each of the column families
100    ///
101    /// This is used to version individual column families to determine if a CF needs to be
102    /// (re)initialized on startup.
103    column_families: BTreeMap<String, ColumnFamilyInfo>,
104}
105
106#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
107struct ColumnFamilyInfo {
108    version: u64,
109}
110
111pub const MAX_TX_RANGE_SIZE: u64 = 4096;
112
113pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
114const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
115const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
116const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
117
118#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
119pub struct TotalBalance {
120    pub balance: i128,
121    pub num_coins: i64,
122    pub address_balance: u64,
123}
124
125#[derive(Debug)]
126pub struct ObjectIndexChanges {
127    pub deleted_owners: Vec<OwnerIndexKey>,
128    pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
129    pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
130    pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
131}
132
133#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
134pub struct CoinInfo {
135    pub version: SequenceNumber,
136    pub digest: ObjectDigest,
137    pub balance: u64,
138    pub previous_transaction: TransactionDigest,
139}
140
141impl CoinInfo {
142    pub fn from_object(object: &Object) -> Option<CoinInfo> {
143        object.as_coin_maybe().map(|coin| CoinInfo {
144            version: object.version(),
145            digest: object.digest(),
146            previous_transaction: object.previous_transaction,
147            balance: coin.value(),
148        })
149    }
150}
151
152pub struct IndexStoreMetrics {
153    balance_lookup_from_db: IntCounter,
154    balance_lookup_from_total: IntCounter,
155    all_balance_lookup_from_db: IntCounter,
156    all_balance_lookup_from_total: IntCounter,
157}
158
159impl IndexStoreMetrics {
160    pub fn new(registry: &Registry) -> IndexStoreMetrics {
161        Self {
162            balance_lookup_from_db: register_int_counter_with_registry!(
163                "balance_lookup_from_db",
164                "Total number of balance requests served from cache",
165                registry,
166            )
167            .unwrap(),
168            balance_lookup_from_total: register_int_counter_with_registry!(
169                "balance_lookup_from_total",
170                "Total number of balance requests served ",
171                registry,
172            )
173            .unwrap(),
174            all_balance_lookup_from_db: register_int_counter_with_registry!(
175                "all_balance_lookup_from_db",
176                "Total number of all balance requests served from cache",
177                registry,
178            )
179            .unwrap(),
180            all_balance_lookup_from_total: register_int_counter_with_registry!(
181                "all_balance_lookup_from_total",
182                "Total number of all balance requests served",
183                registry,
184            )
185            .unwrap(),
186        }
187    }
188}
189
190pub struct IndexStoreCaches {
191    per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
192    all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
193    pub locks: MutexTable<SuiAddress>,
194}
195
196#[derive(Default)]
197pub struct IndexStoreCacheUpdates {
198    _locks: Vec<OwnedMutexGuard<()>>,
199    per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
200    all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
201}
202
203#[derive(DBMapUtils)]
204pub struct IndexStoreTables {
205    /// A singleton that store metadata information on the DB.
206    ///
207    /// A few uses for this singleton:
208    /// - determining if the DB has been initialized (as some tables could still be empty post
209    ///   initialization)
210    /// - version of each column family and their respective initialization status
211    meta: DBMap<(), MetadataInfo>,
212
213    /// Index from sui address to transactions initiated by that address.
214    transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
215
216    /// Index from sui address to transactions that were sent to that address.
217    transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
218
219    /// Index from object id to transactions that used that object id as input.
220    #[deprecated]
221    transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
222
223    /// Index from object id to transactions that modified/created that object id.
224    #[deprecated]
225    transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
226
227    /// Index from package id, module and function identifier to transactions that used that moce function call as input.
228    transactions_by_move_function:
229        DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
230
231    /// Ordering of all indexed transactions.
232    transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
233
234    /// Index from transaction digest to sequence number.
235    transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
236
237    /// This is an index of object references to currently existing objects, indexed by the
238    /// composite key of the SuiAddress of their owner and the object ID of the object.
239    /// This composite index allows an efficient iterator to list all objected currently owned
240    /// by a specific user, and their object reference.
241    owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
242
243    coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
244    // Simple index that just tracks the existance of an address balance for an address.
245    address_balances: DBMap<(SuiAddress, TypeTag), ()>,
246
247    /// This is an index of object references to currently existing dynamic field object, indexed by the
248    /// composite key of the object ID of their parent and the object ID of the dynamic field object.
249    /// This composite index allows an efficient iterator to list all objects currently owned
250    /// by a specific object, and their object reference.
251    dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
252
253    event_order: DBMap<EventId, EventIndex>,
254    event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
255    event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
256    event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
257    event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
258    event_by_time: DBMap<(u64, EventId), EventIndex>,
259
260    pruner_watermark: DBMap<(), TxSequenceNumber>,
261}
262
263impl IndexStoreTables {
264    pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
265        &self.owner_index
266    }
267
268    pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
269        &self.coin_index_2
270    }
271
272    #[allow(deprecated)]
273    fn init(&mut self) -> Result<(), StorageError> {
274        let metadata = {
275            match self.meta.get(&()) {
276                Ok(Some(metadata)) => metadata,
277                Ok(None) | Err(_) => MetadataInfo {
278                    version: CURRENT_DB_VERSION,
279                    column_families: BTreeMap::new(),
280                },
281            }
282        };
283
284        // Commit to the DB that the indexes have been initialized
285        self.meta.insert(&(), &metadata)?;
286
287        Ok(())
288    }
289
290    pub fn get_dynamic_fields_iterator(
291        &self,
292        object: ObjectID,
293        cursor: Option<ObjectID>,
294    ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
295        debug!(?object, "get_dynamic_fields");
296        // The object id 0 is the smallest possible
297        let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
298        let iter_upper_bound = (object, ObjectID::MAX);
299        self.dynamic_field_index
300            .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
301            // skip an extra b/c the cursor is exclusive
302            .skip(usize::from(cursor.is_some()))
303            .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
304            .map_ok(|((_, c), object_info)| (c, object_info))
305    }
306}
307
308pub struct IndexStore {
309    next_sequence_number: AtomicU64,
310    tables: IndexStoreTables,
311    pub caches: IndexStoreCaches,
312    metrics: Arc<IndexStoreMetrics>,
313    max_type_length: u64,
314    remove_deprecated_tables: bool,
315    pruner_watermark: Arc<AtomicU64>,
316}
317
318struct JsonRpcCompactionMetrics {
319    key_removed: IntCounterVec,
320    key_kept: IntCounterVec,
321    key_error: IntCounterVec,
322}
323
324impl JsonRpcCompactionMetrics {
325    pub fn new(registry: &Registry) -> Arc<Self> {
326        Arc::new(Self {
327            key_removed: register_int_counter_vec_with_registry!(
328                "json_rpc_compaction_filter_key_removed",
329                "Compaction key removed",
330                &["cf"],
331                registry
332            )
333            .unwrap(),
334            key_kept: register_int_counter_vec_with_registry!(
335                "json_rpc_compaction_filter_key_kept",
336                "Compaction key kept",
337                &["cf"],
338                registry
339            )
340            .unwrap(),
341            key_error: register_int_counter_vec_with_registry!(
342                "json_rpc_compaction_filter_key_error",
343                "Compaction error",
344                &["cf"],
345                registry
346            )
347            .unwrap(),
348        })
349    }
350}
351
352fn compaction_filter_config<T: DeserializeOwned>(
353    name: &str,
354    metrics: Arc<JsonRpcCompactionMetrics>,
355    mut db_options: DBOptions,
356    pruner_watermark: Arc<AtomicU64>,
357    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
358    by_key: bool,
359) -> DBOptions {
360    let cf = name.to_string();
361    db_options
362        .options
363        .set_compaction_filter(name, move |_, key, value| {
364            let bytes = if by_key { key } else { value };
365            let deserializer = bincode::DefaultOptions::new()
366                .with_big_endian()
367                .with_fixint_encoding();
368            match deserializer.deserialize(bytes) {
369                Ok(key_data) => {
370                    let sequence_number = extractor(key_data);
371                    if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
372                        metrics.key_removed.with_label_values(&[&cf]).inc();
373                        Decision::Remove
374                    } else {
375                        metrics.key_kept.with_label_values(&[&cf]).inc();
376                        Decision::Keep
377                    }
378                }
379                Err(_) => {
380                    metrics.key_error.with_label_values(&[&cf]).inc();
381                    Decision::Keep
382                }
383            }
384        });
385    db_options
386}
387
388fn compaction_filter_config_by_key<T: DeserializeOwned>(
389    name: &str,
390    metrics: Arc<JsonRpcCompactionMetrics>,
391    db_options: DBOptions,
392    pruner_watermark: Arc<AtomicU64>,
393    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
394) -> DBOptions {
395    compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
396}
397
398fn coin_index_table_default_config() -> DBOptions {
399    default_db_options()
400        .optimize_for_write_throughput()
401        .optimize_for_read(
402            read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
403        )
404        .disable_write_throttling()
405}
406
407impl IndexStore {
408    pub fn new_without_init(
409        path: PathBuf,
410        registry: &Registry,
411        max_type_length: Option<u64>,
412        remove_deprecated_tables: bool,
413    ) -> Self {
414        let db_options = default_db_options().disable_write_throttling();
415        let pruner_watermark = Arc::new(AtomicU64::new(0));
416        let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
417        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
418            (
419                "transactions_from_addr".to_string(),
420                compaction_filter_config_by_key(
421                    "transactions_from_addr",
422                    compaction_metrics.clone(),
423                    db_options.clone(),
424                    pruner_watermark.clone(),
425                    |(_, id): (SuiAddress, TxSequenceNumber)| id,
426                ),
427            ),
428            (
429                "transactions_to_addr".to_string(),
430                compaction_filter_config_by_key(
431                    "transactions_to_addr",
432                    compaction_metrics.clone(),
433                    db_options.clone(),
434                    pruner_watermark.clone(),
435                    |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
436                ),
437            ),
438            (
439                "transactions_by_move_function".to_string(),
440                compaction_filter_config_by_key(
441                    "transactions_by_move_function",
442                    compaction_metrics.clone(),
443                    db_options.clone(),
444                    pruner_watermark.clone(),
445                    |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
446                ),
447            ),
448            (
449                "transaction_order".to_string(),
450                compaction_filter_config_by_key(
451                    "transaction_order",
452                    compaction_metrics.clone(),
453                    db_options.clone(),
454                    pruner_watermark.clone(),
455                    |sequence_number: TxSequenceNumber| sequence_number,
456                ),
457            ),
458            (
459                "transactions_seq".to_string(),
460                compaction_filter_config(
461                    "transactions_seq",
462                    compaction_metrics.clone(),
463                    db_options.clone(),
464                    pruner_watermark.clone(),
465                    |sequence_number: TxSequenceNumber| sequence_number,
466                    false,
467                ),
468            ),
469            (
470                "coin_index_2".to_string(),
471                coin_index_table_default_config(),
472            ),
473            (
474                "event_order".to_string(),
475                compaction_filter_config_by_key(
476                    "event_order",
477                    compaction_metrics.clone(),
478                    db_options.clone(),
479                    pruner_watermark.clone(),
480                    |event_id: EventId| event_id.0,
481                ),
482            ),
483            (
484                "event_by_move_module".to_string(),
485                compaction_filter_config_by_key(
486                    "event_by_move_module",
487                    compaction_metrics.clone(),
488                    db_options.clone(),
489                    pruner_watermark.clone(),
490                    |(_, event_id): (ModuleId, EventId)| event_id.0,
491                ),
492            ),
493            (
494                "event_by_event_module".to_string(),
495                compaction_filter_config_by_key(
496                    "event_by_event_module",
497                    compaction_metrics.clone(),
498                    db_options.clone(),
499                    pruner_watermark.clone(),
500                    |(_, event_id): (ModuleId, EventId)| event_id.0,
501                ),
502            ),
503            (
504                "event_by_sender".to_string(),
505                compaction_filter_config_by_key(
506                    "event_by_sender",
507                    compaction_metrics.clone(),
508                    db_options.clone(),
509                    pruner_watermark.clone(),
510                    |(_, event_id): (SuiAddress, EventId)| event_id.0,
511                ),
512            ),
513            (
514                "event_by_time".to_string(),
515                compaction_filter_config_by_key(
516                    "event_by_time",
517                    compaction_metrics.clone(),
518                    db_options.clone(),
519                    pruner_watermark.clone(),
520                    |(_, event_id): (u64, EventId)| event_id.0,
521                ),
522            ),
523        ]));
524        let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
525            path,
526            MetricConf::new("index"),
527            Some(db_options.options),
528            Some(table_options),
529            remove_deprecated_tables,
530        );
531
532        let metrics = IndexStoreMetrics::new(registry);
533        let caches = IndexStoreCaches {
534            per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
535            all_balances: ShardedLruCache::new(1_000_000, 1000),
536            locks: MutexTable::new(128),
537        };
538        let next_sequence_number = tables
539            .transaction_order
540            .reversed_safe_iter_with_bounds(None, None)
541            .expect("failed to initialize indexes")
542            .next()
543            .transpose()
544            .expect("failed to initialize indexes")
545            .map(|(seq, _)| seq + 1)
546            .unwrap_or(0)
547            .into();
548        let pruner_watermark_value = tables
549            .pruner_watermark
550            .get(&())
551            .expect("failed to initialize index tables")
552            .unwrap_or(0);
553        pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
554
555        Self {
556            tables,
557            next_sequence_number,
558            caches,
559            metrics: Arc::new(metrics),
560            max_type_length: max_type_length.unwrap_or(128),
561            remove_deprecated_tables,
562            pruner_watermark,
563        }
564    }
565
566    pub fn new(
567        path: PathBuf,
568        registry: &Registry,
569        max_type_length: Option<u64>,
570        remove_deprecated_tables: bool,
571    ) -> Self {
572        let mut store =
573            Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
574        store.tables.init().unwrap();
575        store
576    }
577
578    pub fn tables(&self) -> &IndexStoreTables {
579        &self.tables
580    }
581
582    #[instrument(skip_all)]
583    pub fn index_coin(
584        &self,
585        digest: &TransactionDigest,
586        batch: &mut DBBatch,
587        object_index_changes: &ObjectIndexChanges,
588        tx_coins: Option<TxCoins>,
589    ) -> SuiResult<IndexStoreCacheUpdates> {
590        // In production if this code path is hit, we should expect `tx_coins` to not be None.
591        // However, in many tests today we do not distinguish validator and/or fullnode, so
592        // we gracefully exist here.
593        if tx_coins.is_none() {
594            return Ok(IndexStoreCacheUpdates::default());
595        }
596        // Acquire locks on changed coin owners
597        let mut addresses: HashSet<SuiAddress> = HashSet::new();
598        addresses.extend(
599            object_index_changes
600                .deleted_owners
601                .iter()
602                .map(|(owner, _)| *owner),
603        );
604        addresses.extend(
605            object_index_changes
606                .new_owners
607                .iter()
608                .map(|((owner, _), _)| *owner),
609        );
610        let _locks = self.caches.locks.acquire_locks(addresses.into_iter());
611        let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
612            HashMap::new();
613        // Index coin info
614        let (input_coins, written_coins) = tx_coins.unwrap();
615
616        // 1. Remove old coins from the DB by looking at the set of input coin objects
617        let coin_delete_keys = input_coins
618            .values()
619            .filter_map(|object| {
620                // only process address owned coins
621                let Owner::AddressOwner(owner) = object.owner() else {
622                    return None;
623                };
624
625                // only process coin types
626                let (coin_type, coin) = object
627                    .coin_type_maybe()
628                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
629
630                let key = CoinIndexKey2::new(
631                    *owner,
632                    coin_type.to_string(),
633                    coin.balance.value(),
634                    object.id(),
635                );
636
637                let map = balance_changes.entry(*owner).or_default();
638                let entry = map.entry(coin_type).or_insert(TotalBalance {
639                    num_coins: 0,
640                    balance: 0,
641                    address_balance: 0,
642                });
643                entry.num_coins -= 1;
644                entry.balance -= coin.balance.value() as i128;
645
646                Some(key)
647            })
648            .collect::<Vec<_>>();
649        trace!(
650            tx_digset=?digest,
651            "coin_delete_keys: {:?}",
652            coin_delete_keys,
653        );
654        batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
655
656        // 2. Insert new coins, or new versions of coins, by looking at `written_coins`.
657        let coin_add_keys = written_coins
658            .values()
659            .filter_map(|object| {
660                // only process address owned coins
661                let Owner::AddressOwner(owner) = object.owner() else {
662                    return None;
663                };
664
665                // only process coin types
666                let (coin_type, coin) = object
667                    .coin_type_maybe()
668                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
669
670                let key = CoinIndexKey2::new(
671                    *owner,
672                    coin_type.to_string(),
673                    coin.balance.value(),
674                    object.id(),
675                );
676                let value = CoinInfo {
677                    version: object.version(),
678                    digest: object.digest(),
679                    balance: coin.balance.value(),
680                    previous_transaction: object.previous_transaction,
681                };
682                let map = balance_changes.entry(*owner).or_default();
683                let entry = map.entry(coin_type).or_insert(TotalBalance {
684                    num_coins: 0,
685                    balance: 0,
686                    address_balance: 0,
687                });
688                entry.num_coins += 1;
689                entry.balance += coin.balance.value() as i128;
690
691                Some((key, value))
692            })
693            .collect::<Vec<_>>();
694        trace!(
695            tx_digset=?digest,
696            "coin_add_keys: {:?}",
697            coin_add_keys,
698        );
699
700        batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
701
702        let per_coin_type_balance_changes: Vec<_> = balance_changes
703            .iter()
704            .flat_map(|(address, balance_map)| {
705                balance_map.iter().map(|(type_tag, balance)| {
706                    (
707                        (*address, type_tag.clone()),
708                        Ok::<TotalBalance, SuiError>(*balance),
709                    )
710                })
711            })
712            .collect();
713        let all_balance_changes: Vec<_> = balance_changes
714            .into_iter()
715            .map(|(address, balance_map)| {
716                (
717                    address,
718                    Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
719                )
720            })
721            .collect();
722        let cache_updates = IndexStoreCacheUpdates {
723            _locks,
724            per_coin_type_balance_changes,
725            all_balance_changes,
726        };
727        Ok(cache_updates)
728    }
729
730    #[instrument(skip_all)]
731    pub fn index_tx(
732        &self,
733        sender: SuiAddress,
734        active_inputs: impl Iterator<Item = ObjectID>,
735        mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
736        move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
737        events: &TransactionEvents,
738        object_index_changes: ObjectIndexChanges,
739        digest: &TransactionDigest,
740        timestamp_ms: u64,
741        tx_coins: Option<TxCoins>,
742        accumulator_events: Vec<AccumulatorEvent>,
743    ) -> SuiResult<u64> {
744        let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
745        let mut batch = self.tables.transactions_from_addr.batch();
746
747        batch.insert_batch(
748            &self.tables.transaction_order,
749            std::iter::once((sequence, *digest)),
750        )?;
751
752        batch.insert_batch(
753            &self.tables.transactions_seq,
754            std::iter::once((*digest, sequence)),
755        )?;
756
757        batch.insert_batch(
758            &self.tables.transactions_from_addr,
759            std::iter::once(((sender, sequence), *digest)),
760        )?;
761
762        #[allow(deprecated)]
763        if !self.remove_deprecated_tables {
764            batch.insert_batch(
765                &self.tables.transactions_by_input_object_id,
766                active_inputs.map(|id| ((id, sequence), *digest)),
767            )?;
768
769            batch.insert_batch(
770                &self.tables.transactions_by_mutated_object_id,
771                mutated_objects
772                    .clone()
773                    .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
774            )?;
775        }
776
777        batch.insert_batch(
778            &self.tables.transactions_by_move_function,
779            move_functions
780                .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
781        )?;
782
783        batch.insert_batch(
784            &self.tables.transactions_to_addr,
785            mutated_objects.filter_map(|(_, owner)| {
786                owner
787                    .get_address_owner_address()
788                    .ok()
789                    .map(|addr| ((addr, sequence), digest))
790            }),
791        )?;
792
793        // Coin Index
794        let cache_updates = self.index_coin(digest, &mut batch, &object_index_changes, tx_coins)?;
795
796        // update address balances index
797        let address_balance_updates = accumulator_events.into_iter().filter_map(|event| {
798            let ty = &event.write.address.ty;
799            // Only process events with Balance<T> types
800            let coin_type = sui_types::balance::Balance::maybe_get_balance_type_param(ty)?;
801            Some(((event.write.address.address, coin_type), ()))
802        });
803        batch.insert_batch(&self.tables.address_balances, address_balance_updates)?;
804
805        // Owner index
806        batch.delete_batch(
807            &self.tables.owner_index,
808            object_index_changes.deleted_owners.into_iter(),
809        )?;
810        batch.delete_batch(
811            &self.tables.dynamic_field_index,
812            object_index_changes.deleted_dynamic_fields.into_iter(),
813        )?;
814
815        batch.insert_batch(
816            &self.tables.owner_index,
817            object_index_changes.new_owners.into_iter(),
818        )?;
819
820        batch.insert_batch(
821            &self.tables.dynamic_field_index,
822            object_index_changes.new_dynamic_fields.into_iter(),
823        )?;
824
825        // events
826        let event_digest = events.digest();
827        batch.insert_batch(
828            &self.tables.event_order,
829            events
830                .data
831                .iter()
832                .enumerate()
833                .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
834        )?;
835        batch.insert_batch(
836            &self.tables.event_by_move_module,
837            events
838                .data
839                .iter()
840                .enumerate()
841                .map(|(i, e)| {
842                    (
843                        i,
844                        ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
845                    )
846                })
847                .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
848        )?;
849        batch.insert_batch(
850            &self.tables.event_by_sender,
851            events.data.iter().enumerate().map(|(i, e)| {
852                (
853                    (e.sender, (sequence, i)),
854                    (event_digest, *digest, timestamp_ms),
855                )
856            }),
857        )?;
858        batch.insert_batch(
859            &self.tables.event_by_move_event,
860            events.data.iter().enumerate().map(|(i, e)| {
861                (
862                    (e.type_.clone(), (sequence, i)),
863                    (event_digest, *digest, timestamp_ms),
864                )
865            }),
866        )?;
867
868        batch.insert_batch(
869            &self.tables.event_by_time,
870            events.data.iter().enumerate().map(|(i, _)| {
871                (
872                    (timestamp_ms, (sequence, i)),
873                    (event_digest, *digest, timestamp_ms),
874                )
875            }),
876        )?;
877
878        batch.insert_batch(
879            &self.tables.event_by_event_module,
880            events.data.iter().enumerate().map(|(i, e)| {
881                (
882                    (
883                        ModuleId::new(e.type_.address, e.type_.module.clone()),
884                        (sequence, i),
885                    ),
886                    (event_digest, *digest, timestamp_ms),
887                )
888            }),
889        )?;
890
891        let invalidate_caches =
892            read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
893
894        if invalidate_caches {
895            // Invalidate cache before writing to db so we always serve latest values
896            self.invalidate_per_coin_type_cache(
897                cache_updates
898                    .per_coin_type_balance_changes
899                    .iter()
900                    .map(|x| x.0.clone()),
901            )?;
902            self.invalidate_all_balance_cache(
903                cache_updates.all_balance_changes.iter().map(|x| x.0),
904            )?;
905        }
906
907        batch.write()?;
908
909        if !invalidate_caches {
910            // We cannot update the cache before updating the db or else on failing to write to db
911            // we will update the cache twice). However, this only means cache is eventually consistent with
912            // the db (within a very short delay)
913            self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)?;
914            self.update_all_balance_cache(cache_updates.all_balance_changes)?;
915        }
916        Ok(sequence)
917    }
918
919    pub fn next_sequence_number(&self) -> TxSequenceNumber {
920        self.next_sequence_number.load(Ordering::SeqCst) + 1
921    }
922
923    #[instrument(skip(self))]
924    pub fn get_transactions(
925        &self,
926        filter: Option<TransactionFilter>,
927        cursor: Option<TransactionDigest>,
928        limit: Option<usize>,
929        reverse: bool,
930    ) -> SuiResult<Vec<TransactionDigest>> {
931        // Lookup TransactionDigest sequence number,
932        let cursor = if let Some(cursor) = cursor {
933            Some(
934                self.get_transaction_seq(&cursor)?
935                    .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
936            )
937        } else {
938            None
939        };
940        match filter {
941            Some(TransactionFilter::MoveFunction {
942                package,
943                module,
944                function,
945            }) => Ok(self.get_transactions_by_move_function(
946                package, module, function, cursor, limit, reverse,
947            )?),
948            Some(TransactionFilter::InputObject(object_id)) => {
949                Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
950            }
951            Some(TransactionFilter::ChangedObject(object_id)) => {
952                Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
953            }
954            Some(TransactionFilter::FromAddress(address)) => {
955                Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
956            }
957            Some(TransactionFilter::ToAddress(address)) => {
958                Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
959            }
960            // NOTE: filter via checkpoint sequence number is implemented in
961            // `get_transactions` of authority.rs.
962            Some(_) => Err(SuiErrorKind::UserInputError {
963                error: UserInputError::Unsupported(format!("{:?}", filter)),
964            }
965            .into()),
966            None => {
967                if reverse {
968                    let iter = self
969                        .tables
970                        .transaction_order
971                        .reversed_safe_iter_with_bounds(
972                            None,
973                            Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
974                        )?
975                        .skip(usize::from(cursor.is_some()))
976                        .map(|result| result.map(|(_, digest)| digest));
977                    if let Some(limit) = limit {
978                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
979                    } else {
980                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
981                    }
982                } else {
983                    let iter = self
984                        .tables
985                        .transaction_order
986                        .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
987                        .skip(usize::from(cursor.is_some()))
988                        .map(|result| result.map(|(_, digest)| digest));
989                    if let Some(limit) = limit {
990                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
991                    } else {
992                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
993                    }
994                }
995            }
996        }
997    }
998
999    #[instrument(skip_all)]
1000    fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
1001        index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
1002        key: KeyT,
1003        cursor: Option<TxSequenceNumber>,
1004        limit: Option<usize>,
1005        reverse: bool,
1006    ) -> SuiResult<Vec<TransactionDigest>> {
1007        Ok(if reverse {
1008            let iter = index
1009                .reversed_safe_iter_with_bounds(
1010                    None,
1011                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
1012                )?
1013                // skip one more if exclusive cursor is Some
1014                .skip(usize::from(cursor.is_some()))
1015                .take_while(|result| {
1016                    result
1017                        .as_ref()
1018                        .map(|((id, _), _)| *id == key)
1019                        .unwrap_or(false)
1020                })
1021                .map(|result| result.map(|(_, digest)| digest));
1022            if let Some(limit) = limit {
1023                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1024            } else {
1025                iter.collect::<Result<Vec<_>, _>>()?
1026            }
1027        } else {
1028            let iter = index
1029                .safe_iter_with_bounds(
1030                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1031                    None,
1032                )
1033                // skip one more if exclusive cursor is Some
1034                .skip(usize::from(cursor.is_some()))
1035                .map(|result| result.expect("iterator db error"))
1036                .take_while(|((id, _), _)| *id == key)
1037                .map(|(_, digest)| digest);
1038            if let Some(limit) = limit {
1039                iter.take(limit).collect()
1040            } else {
1041                iter.collect()
1042            }
1043        })
1044    }
1045
1046    #[instrument(skip(self))]
1047    pub fn get_transactions_by_input_object(
1048        &self,
1049        input_object: ObjectID,
1050        cursor: Option<TxSequenceNumber>,
1051        limit: Option<usize>,
1052        reverse: bool,
1053    ) -> SuiResult<Vec<TransactionDigest>> {
1054        if self.remove_deprecated_tables {
1055            return Ok(vec![]);
1056        }
1057        #[allow(deprecated)]
1058        Self::get_transactions_from_index(
1059            &self.tables.transactions_by_input_object_id,
1060            input_object,
1061            cursor,
1062            limit,
1063            reverse,
1064        )
1065    }
1066
1067    #[instrument(skip(self))]
1068    pub fn get_transactions_by_mutated_object(
1069        &self,
1070        mutated_object: ObjectID,
1071        cursor: Option<TxSequenceNumber>,
1072        limit: Option<usize>,
1073        reverse: bool,
1074    ) -> SuiResult<Vec<TransactionDigest>> {
1075        if self.remove_deprecated_tables {
1076            return Ok(vec![]);
1077        }
1078        #[allow(deprecated)]
1079        Self::get_transactions_from_index(
1080            &self.tables.transactions_by_mutated_object_id,
1081            mutated_object,
1082            cursor,
1083            limit,
1084            reverse,
1085        )
1086    }
1087
1088    #[instrument(skip(self))]
1089    pub fn get_transactions_from_addr(
1090        &self,
1091        addr: SuiAddress,
1092        cursor: Option<TxSequenceNumber>,
1093        limit: Option<usize>,
1094        reverse: bool,
1095    ) -> SuiResult<Vec<TransactionDigest>> {
1096        Self::get_transactions_from_index(
1097            &self.tables.transactions_from_addr,
1098            addr,
1099            cursor,
1100            limit,
1101            reverse,
1102        )
1103    }
1104
1105    #[instrument(skip(self))]
1106    pub fn get_transactions_by_move_function(
1107        &self,
1108        package: ObjectID,
1109        module: Option<String>,
1110        function: Option<String>,
1111        cursor: Option<TxSequenceNumber>,
1112        limit: Option<usize>,
1113        reverse: bool,
1114    ) -> SuiResult<Vec<TransactionDigest>> {
1115        // If we are passed a function with no module return a UserInputError
1116        if function.is_some() && module.is_none() {
1117            return Err(SuiErrorKind::UserInputError {
1118                error: UserInputError::MoveFunctionInputError(
1119                    "Cannot supply function without supplying module".to_string(),
1120                ),
1121            }
1122            .into());
1123        }
1124
1125        // We cannot have a cursor without filling out the other keys.
1126        if cursor.is_some() && (module.is_none() || function.is_none()) {
1127            return Err(SuiErrorKind::UserInputError {
1128                error: UserInputError::MoveFunctionInputError(
1129                    "Cannot supply cursor without supplying module and function".to_string(),
1130                ),
1131            }
1132            .into());
1133        }
1134
1135        let cursor_val = cursor.unwrap_or(if reverse {
1136            TxSequenceNumber::MAX
1137        } else {
1138            TxSequenceNumber::MIN
1139        });
1140
1141        let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1142        let module_val = module.clone().unwrap_or(if reverse {
1143            max_string.clone()
1144        } else {
1145            "".to_string()
1146        });
1147
1148        let function_val =
1149            function
1150                .clone()
1151                .unwrap_or(if reverse { max_string } else { "".to_string() });
1152
1153        let key = (package, module_val, function_val, cursor_val);
1154        Ok(if reverse {
1155            let iter = self
1156                .tables
1157                .transactions_by_move_function
1158                .reversed_safe_iter_with_bounds(None, Some(key))?
1159                // skip one more if exclusive cursor is Some
1160                .skip(usize::from(cursor.is_some()))
1161                .take_while(|result| {
1162                    result
1163                        .as_ref()
1164                        .map(|((id, m, f, _), _)| {
1165                            *id == package
1166                                && module.as_ref().map(|x| x == m).unwrap_or(true)
1167                                && function.as_ref().map(|x| x == f).unwrap_or(true)
1168                        })
1169                        .unwrap_or(false)
1170                })
1171                .map(|result| result.map(|(_, digest)| digest));
1172            if let Some(limit) = limit {
1173                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1174            } else {
1175                iter.collect::<Result<Vec<_>, _>>()?
1176            }
1177        } else {
1178            let iter = self
1179                .tables
1180                .transactions_by_move_function
1181                .safe_iter_with_bounds(Some(key), None)
1182                .map(|result| result.expect("iterator db error"))
1183                // skip one more if exclusive cursor is Some
1184                .skip(usize::from(cursor.is_some()))
1185                .take_while(|((id, m, f, _), _)| {
1186                    *id == package
1187                        && module.as_ref().map(|x| x == m).unwrap_or(true)
1188                        && function.as_ref().map(|x| x == f).unwrap_or(true)
1189                })
1190                .map(|(_, digest)| digest);
1191            if let Some(limit) = limit {
1192                iter.take(limit).collect()
1193            } else {
1194                iter.collect()
1195            }
1196        })
1197    }
1198
1199    #[instrument(skip(self))]
1200    pub fn get_transactions_to_addr(
1201        &self,
1202        addr: SuiAddress,
1203        cursor: Option<TxSequenceNumber>,
1204        limit: Option<usize>,
1205        reverse: bool,
1206    ) -> SuiResult<Vec<TransactionDigest>> {
1207        Self::get_transactions_from_index(
1208            &self.tables.transactions_to_addr,
1209            addr,
1210            cursor,
1211            limit,
1212            reverse,
1213        )
1214    }
1215
1216    #[instrument(skip(self))]
1217    pub fn get_transaction_seq(
1218        &self,
1219        digest: &TransactionDigest,
1220    ) -> SuiResult<Option<TxSequenceNumber>> {
1221        Ok(self.tables.transactions_seq.get(digest)?)
1222    }
1223
1224    #[instrument(skip(self))]
1225    pub fn all_events(
1226        &self,
1227        tx_seq: TxSequenceNumber,
1228        event_seq: usize,
1229        limit: usize,
1230        descending: bool,
1231    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1232        Ok(if descending {
1233            self.tables
1234                .event_order
1235                .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1236                .take(limit)
1237                .map(|result| {
1238                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1239                        (digest, tx_digest, event_seq, time)
1240                    })
1241                })
1242                .collect::<Result<Vec<_>, _>>()?
1243        } else {
1244            self.tables
1245                .event_order
1246                .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1247                .take(limit)
1248                .map(|result| {
1249                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1250                        (digest, tx_digest, event_seq, time)
1251                    })
1252                })
1253                .collect::<Result<Vec<_>, _>>()?
1254        })
1255    }
1256
1257    #[instrument(skip(self))]
1258    pub fn events_by_transaction(
1259        &self,
1260        digest: &TransactionDigest,
1261        tx_seq: TxSequenceNumber,
1262        event_seq: usize,
1263        limit: usize,
1264        descending: bool,
1265    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1266        let seq = self
1267            .get_transaction_seq(digest)?
1268            .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1269        Ok(if descending {
1270            self.tables
1271                .event_order
1272                .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1273                .take_while(|result| {
1274                    result
1275                        .as_ref()
1276                        .map(|((tx, _), _)| tx == &seq)
1277                        .unwrap_or(false)
1278                })
1279                .take(limit)
1280                .map(|result| {
1281                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1282                        (digest, tx_digest, event_seq, time)
1283                    })
1284                })
1285                .collect::<Result<Vec<_>, _>>()?
1286        } else {
1287            self.tables
1288                .event_order
1289                .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1290                .map(|result| result.expect("iterator db error"))
1291                .take_while(|((tx, _), _)| tx == &seq)
1292                .take(limit)
1293                .map(|((_, event_seq), (digest, tx_digest, time))| {
1294                    (digest, tx_digest, event_seq, time)
1295                })
1296                .collect()
1297        })
1298    }
1299
1300    #[instrument(skip_all)]
1301    fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1302        index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1303        key: &KeyT,
1304        tx_seq: TxSequenceNumber,
1305        event_seq: usize,
1306        limit: usize,
1307        descending: bool,
1308    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1309        Ok(if descending {
1310            index
1311                .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1312                .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1313                .take(limit)
1314                .map(|result| {
1315                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1316                        (digest, tx_digest, event_seq, time)
1317                    })
1318                })
1319                .collect::<Result<Vec<_>, _>>()?
1320        } else {
1321            index
1322                .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1323                .map(|result| result.expect("iterator db error"))
1324                .take_while(|((m, _), _)| m == key)
1325                .take(limit)
1326                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1327                    (digest, tx_digest, event_seq, time)
1328                })
1329                .collect()
1330        })
1331    }
1332
1333    #[instrument(skip(self))]
1334    pub fn events_by_module_id(
1335        &self,
1336        module: &ModuleId,
1337        tx_seq: TxSequenceNumber,
1338        event_seq: usize,
1339        limit: usize,
1340        descending: bool,
1341    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1342        Self::get_event_from_index(
1343            &self.tables.event_by_move_module,
1344            module,
1345            tx_seq,
1346            event_seq,
1347            limit,
1348            descending,
1349        )
1350    }
1351
1352    #[instrument(skip(self))]
1353    pub fn events_by_move_event_struct_name(
1354        &self,
1355        struct_name: &StructTag,
1356        tx_seq: TxSequenceNumber,
1357        event_seq: usize,
1358        limit: usize,
1359        descending: bool,
1360    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1361        Self::get_event_from_index(
1362            &self.tables.event_by_move_event,
1363            struct_name,
1364            tx_seq,
1365            event_seq,
1366            limit,
1367            descending,
1368        )
1369    }
1370
1371    #[instrument(skip(self))]
1372    pub fn events_by_move_event_module(
1373        &self,
1374        module_id: &ModuleId,
1375        tx_seq: TxSequenceNumber,
1376        event_seq: usize,
1377        limit: usize,
1378        descending: bool,
1379    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1380        Self::get_event_from_index(
1381            &self.tables.event_by_event_module,
1382            module_id,
1383            tx_seq,
1384            event_seq,
1385            limit,
1386            descending,
1387        )
1388    }
1389
1390    #[instrument(skip(self))]
1391    pub fn events_by_sender(
1392        &self,
1393        sender: &SuiAddress,
1394        tx_seq: TxSequenceNumber,
1395        event_seq: usize,
1396        limit: usize,
1397        descending: bool,
1398    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1399        Self::get_event_from_index(
1400            &self.tables.event_by_sender,
1401            sender,
1402            tx_seq,
1403            event_seq,
1404            limit,
1405            descending,
1406        )
1407    }
1408
1409    #[instrument(skip(self))]
1410    pub fn event_iterator(
1411        &self,
1412        start_time: u64,
1413        end_time: u64,
1414        tx_seq: TxSequenceNumber,
1415        event_seq: usize,
1416        limit: usize,
1417        descending: bool,
1418    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1419        Ok(if descending {
1420            self.tables
1421                .event_by_time
1422                .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1423                .take_while(|result| {
1424                    result
1425                        .as_ref()
1426                        .map(|((m, _), _)| m >= &start_time)
1427                        .unwrap_or(false)
1428                })
1429                .take(limit)
1430                .map(|result| {
1431                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1432                        (digest, tx_digest, event_seq, time)
1433                    })
1434                })
1435                .collect::<Result<Vec<_>, _>>()?
1436        } else {
1437            self.tables
1438                .event_by_time
1439                .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1440                .map(|result| result.expect("iterator db error"))
1441                .take_while(|((m, _), _)| m <= &end_time)
1442                .take(limit)
1443                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1444                    (digest, tx_digest, event_seq, time)
1445                })
1446                .collect()
1447        })
1448    }
1449
1450    pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1451        match self
1452            .tables
1453            .event_by_time
1454            .reversed_safe_iter_with_bounds(
1455                None,
1456                Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1457            )?
1458            .next()
1459            .transpose()?
1460        {
1461            Some(((_, (watermark, _)), _)) => {
1462                if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1463                    info!(
1464                        "json rpc index pruning. Watermark is {} with digest {}",
1465                        watermark, digest
1466                    );
1467                }
1468                self.pruner_watermark.store(watermark, Ordering::Relaxed);
1469                self.tables.pruner_watermark.insert(&(), &watermark)?;
1470                Ok(watermark)
1471            }
1472            None => Ok(0),
1473        }
1474    }
1475
1476    pub fn get_dynamic_fields_iterator(
1477        &self,
1478        object: ObjectID,
1479        cursor: Option<ObjectID>,
1480    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1481    {
1482        Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1483    }
1484
1485    #[instrument(skip(self))]
1486    pub fn get_dynamic_field_object_id(
1487        &self,
1488        object: ObjectID,
1489        name_type: TypeTag,
1490        name_bcs_bytes: &[u8],
1491    ) -> SuiResult<Option<ObjectID>> {
1492        debug!(?object, "get_dynamic_field_object_id");
1493        let dynamic_field_id =
1494            dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1495                |e| {
1496                    SuiErrorKind::Unknown(format!(
1497                        "Unable to generate dynamic field id. Got error: {e:?}"
1498                    ))
1499                },
1500            )?;
1501
1502        if let Some(info) = self
1503            .tables
1504            .dynamic_field_index
1505            .get(&(object, dynamic_field_id))?
1506        {
1507            // info.object_id != dynamic_field_id ==> is_wrapper
1508            debug_assert!(
1509                info.object_id == dynamic_field_id
1510                    || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1511            );
1512            return Ok(Some(info.object_id));
1513        }
1514
1515        let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1516        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1517        let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1518            object,
1519            &dynamic_object_field_type,
1520            name_bcs_bytes,
1521        )
1522        .map_err(|e| {
1523            SuiErrorKind::Unknown(format!(
1524                "Unable to generate dynamic field id. Got error: {e:?}"
1525            ))
1526        })?;
1527        if let Some(info) = self
1528            .tables
1529            .dynamic_field_index
1530            .get(&(object, dynamic_object_field_id))?
1531        {
1532            return Ok(Some(info.object_id));
1533        }
1534
1535        Ok(None)
1536    }
1537
1538    #[instrument(skip(self))]
1539    pub fn get_owner_objects(
1540        &self,
1541        owner: SuiAddress,
1542        cursor: Option<ObjectID>,
1543        limit: usize,
1544        filter: Option<SuiObjectDataFilter>,
1545    ) -> SuiResult<Vec<ObjectInfo>> {
1546        let cursor = match cursor {
1547            Some(cursor) => cursor,
1548            None => ObjectID::ZERO,
1549        };
1550        Ok(self
1551            .get_owner_objects_iterator(owner, cursor, filter)?
1552            .take(limit)
1553            .collect())
1554    }
1555
1556    pub fn get_address_balance_coin_types_iter(
1557        &self,
1558        owner: SuiAddress,
1559    ) -> impl Iterator<Item = TypeTag> {
1560        let start_key = (owner, TypeTag::Bool);
1561        self.tables()
1562            .address_balances
1563            .safe_iter_with_bounds(Some(start_key), None)
1564            .map(|result| result.expect("iterator db error"))
1565            .take_while(move |(key, _)| key.0 == owner)
1566            .map(|(key, _)| key.1)
1567    }
1568
1569    pub fn get_owned_coins_iterator(
1570        coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1571        owner: SuiAddress,
1572        coin_type_tag: Option<String>,
1573    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1574        let all_coins = coin_type_tag.is_none();
1575        let starting_coin_type =
1576            coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1577        let start_key =
1578            CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1579        Ok(coin_index
1580            .safe_iter_with_bounds(Some(start_key), None)
1581            .map(|result| result.expect("iterator db error"))
1582            .take_while(move |(key, _)| {
1583                if key.owner != owner {
1584                    return false;
1585                }
1586                if !all_coins && starting_coin_type != key.coin_type {
1587                    return false;
1588                }
1589                true
1590            }))
1591    }
1592
1593    pub fn get_owned_coins_iterator_with_cursor(
1594        &self,
1595        owner: SuiAddress,
1596        cursor: (String, u64, ObjectID),
1597        limit: usize,
1598        one_coin_type_only: bool,
1599    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1600        let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1601        let start_key = CoinIndexKey2::new_from_cursor(
1602            owner,
1603            starting_coin_type.clone(),
1604            inverted_balance,
1605            starting_object_id,
1606        );
1607        Ok(self
1608            .tables
1609            .coin_index_2
1610            .safe_iter_with_bounds(Some(start_key), None)
1611            .map(|result| result.expect("iterator db error"))
1612            .filter(move |(key, _)| key.object_id != starting_object_id)
1613            .enumerate()
1614            .take_while(move |(index, (key, _))| {
1615                if *index >= limit {
1616                    return false;
1617                }
1618                if key.owner != owner {
1619                    return false;
1620                }
1621                if one_coin_type_only && starting_coin_type != key.coin_type {
1622                    return false;
1623                }
1624                true
1625            })
1626            .map(|(_index, (key, info))| (key, info)))
1627    }
1628
1629    /// starting_object_id can be used to implement pagination, where a client remembers the last
1630    /// object id of each page, and use it to query the next page.
1631    pub fn get_owner_objects_iterator(
1632        &self,
1633        owner: SuiAddress,
1634        starting_object_id: ObjectID,
1635        filter: Option<SuiObjectDataFilter>,
1636    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1637        Ok(self
1638            .tables
1639            .owner_index
1640            // The object id 0 is the smallest possible
1641            .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1642            .map(|result| result.expect("iterator db error"))
1643            .skip(usize::from(starting_object_id != ObjectID::ZERO))
1644            .take_while(move |((address_owner, _), _)| address_owner == &owner)
1645            .filter(move |(_, o)| {
1646                if let Some(filter) = filter.as_ref() {
1647                    filter.matches(o)
1648                } else {
1649                    true
1650                }
1651            })
1652            .map(|(_, object_info)| object_info))
1653    }
1654
1655    pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
1656        let mut batch = self.tables.owner_index.batch();
1657        batch.insert_batch(
1658            &self.tables.owner_index,
1659            object_index_changes.new_owners.into_iter(),
1660        )?;
1661        batch.insert_batch(
1662            &self.tables.dynamic_field_index,
1663            object_index_changes.new_dynamic_fields.into_iter(),
1664        )?;
1665        batch.write()?;
1666        Ok(())
1667    }
1668
1669    pub fn is_empty(&self) -> bool {
1670        self.tables.owner_index.is_empty()
1671    }
1672
1673    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1674        // We are checkpointing the whole db
1675        self.tables
1676            .transactions_from_addr
1677            .checkpoint_db(path)
1678            .map_err(Into::into)
1679    }
1680
1681    /// This method first gets the balance from `per_coin_type_balance` cache. On a cache miss, it
1682    /// gets the balance for passed in `coin_type` from the `all_balance` cache. Only on the second
1683    /// cache miss, we go to the database (expensive) and update the cache. Notice that db read is
1684    /// done with `spawn_blocking` as that is expected to block
1685    #[instrument(skip(self))]
1686    pub fn get_coin_object_balance(
1687        &self,
1688        owner: SuiAddress,
1689        coin_type: TypeTag,
1690    ) -> SuiResult<TotalBalance> {
1691        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1692        let cloned_coin_type = coin_type.clone();
1693        let metrics_cloned = self.metrics.clone();
1694        let coin_index_cloned = self.tables.coin_index_2.clone();
1695        if force_disable_cache {
1696            return Self::get_balance_from_db(
1697                metrics_cloned,
1698                coin_index_cloned,
1699                owner,
1700                cloned_coin_type,
1701            )
1702            .map_err(|e| {
1703                SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1704                    .into()
1705            });
1706        }
1707
1708        self.metrics.balance_lookup_from_total.inc();
1709
1710        let balance = self
1711            .caches
1712            .per_coin_type_balance
1713            .get(&(owner, coin_type.clone()));
1714        if let Some(balance) = balance {
1715            return balance;
1716        }
1717        // cache miss, lookup in all balance cache
1718        let all_balance = self.caches.all_balances.get(&owner.clone());
1719        if let Some(Ok(all_balance)) = all_balance
1720            && let Some(balance) = all_balance.get(&coin_type)
1721        {
1722            return Ok(*balance);
1723        }
1724        let cloned_coin_type = coin_type.clone();
1725        let metrics_cloned = self.metrics.clone();
1726        let coin_index_cloned = self.tables.coin_index_2.clone();
1727        self.caches
1728            .per_coin_type_balance
1729            .get_with((owner, coin_type), move || {
1730                Self::get_balance_from_db(
1731                    metrics_cloned,
1732                    coin_index_cloned,
1733                    owner,
1734                    cloned_coin_type,
1735                )
1736                .map_err(|e| {
1737                    SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1738                        .into()
1739                })
1740            })
1741    }
1742
1743    /// This method gets the balance for all coin types from the `all_balance` cache. On a cache miss,
1744    /// we go to the database (expensive) and update the cache. This cache is dual purpose in the
1745    /// sense that it not only serves `get_AllBalance()` calls but is also used for serving
1746    /// `get_Balance()` queries. Notice that db read is performed with `spawn_blocking` as that is
1747    /// expected to block
1748    #[instrument(skip(self))]
1749    pub fn get_all_coin_object_balances(
1750        &self,
1751        owner: SuiAddress,
1752    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1753        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1754        let metrics_cloned = self.metrics.clone();
1755        let coin_index_cloned = self.tables.coin_index_2.clone();
1756        if force_disable_cache {
1757            return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1758                .map_err(|e| {
1759                    SuiErrorKind::ExecutionError(format!(
1760                        "Failed to read all balance from DB: {:?}",
1761                        e
1762                    ))
1763                    .into()
1764                });
1765        }
1766
1767        self.metrics.all_balance_lookup_from_total.inc();
1768        let metrics_cloned = self.metrics.clone();
1769        let coin_index_cloned = self.tables.coin_index_2.clone();
1770        self.caches.all_balances.get_with(owner, move || {
1771            Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
1772                SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
1773                    .into()
1774            })
1775        })
1776    }
1777
1778    /// Read balance for a `SuiAddress` and `CoinType` from the backend database
1779    #[instrument(skip_all)]
1780    pub fn get_balance_from_db(
1781        metrics: Arc<IndexStoreMetrics>,
1782        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1783        owner: SuiAddress,
1784        coin_type: TypeTag,
1785    ) -> SuiResult<TotalBalance> {
1786        metrics.balance_lookup_from_db.inc();
1787        let coin_type_str = coin_type.to_string();
1788        let coins =
1789            Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
1790
1791        let mut balance = 0i128;
1792        let mut num_coins = 0;
1793        for (_key, coin_info) in coins {
1794            balance += coin_info.balance as i128;
1795            num_coins += 1;
1796        }
1797        Ok(TotalBalance {
1798            balance,
1799            num_coins,
1800            address_balance: 0,
1801        })
1802    }
1803
1804    /// Read all balances for a `SuiAddress` from the backend database
1805    #[instrument(skip_all)]
1806    pub fn get_all_balances_from_db(
1807        metrics: Arc<IndexStoreMetrics>,
1808        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1809        owner: SuiAddress,
1810    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1811        metrics.all_balance_lookup_from_db.inc();
1812        let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1813        let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1814            .chunk_by(|(key, _coin)| key.coin_type.clone());
1815        for (coin_type, coins) in &coins {
1816            let mut total_balance = 0i128;
1817            let mut coin_object_count = 0;
1818            for (_, coin_info) in coins {
1819                total_balance += coin_info.balance as i128;
1820                coin_object_count += 1;
1821            }
1822            let coin_type =
1823                TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
1824                    SuiErrorKind::ExecutionError(format!(
1825                        "Failed to parse event sender address: {:?}",
1826                        e
1827                    ))
1828                })?));
1829            balances.insert(
1830                coin_type,
1831                TotalBalance {
1832                    num_coins: coin_object_count,
1833                    balance: total_balance,
1834                    address_balance: 0,
1835                },
1836            );
1837        }
1838        Ok(Arc::new(balances))
1839    }
1840
1841    fn invalidate_per_coin_type_cache(
1842        &self,
1843        keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
1844    ) -> SuiResult {
1845        self.caches.per_coin_type_balance.batch_invalidate(keys);
1846        Ok(())
1847    }
1848
1849    fn invalidate_all_balance_cache(
1850        &self,
1851        addresses: impl IntoIterator<Item = SuiAddress>,
1852    ) -> SuiResult {
1853        self.caches.all_balances.batch_invalidate(addresses);
1854        Ok(())
1855    }
1856
1857    fn update_per_coin_type_cache(
1858        &self,
1859        keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
1860    ) -> SuiResult {
1861        self.caches
1862            .per_coin_type_balance
1863            .batch_merge(keys, Self::merge_balance);
1864        Ok(())
1865    }
1866
1867    fn merge_balance(
1868        old_balance: &SuiResult<TotalBalance>,
1869        balance_delta: &SuiResult<TotalBalance>,
1870    ) -> SuiResult<TotalBalance> {
1871        if let Ok(old_balance) = old_balance {
1872            if let Ok(balance_delta) = balance_delta {
1873                Ok(TotalBalance {
1874                    balance: old_balance.balance + balance_delta.balance,
1875                    num_coins: old_balance.num_coins + balance_delta.num_coins,
1876                    address_balance: old_balance.address_balance,
1877                })
1878            } else {
1879                balance_delta.clone()
1880            }
1881        } else {
1882            old_balance.clone()
1883        }
1884    }
1885
1886    fn update_all_balance_cache(
1887        &self,
1888        keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1889    ) -> SuiResult {
1890        self.caches
1891            .all_balances
1892            .batch_merge(keys, Self::merge_all_balance);
1893        Ok(())
1894    }
1895
1896    fn merge_all_balance(
1897        old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1898        balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1899    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1900        if let Ok(old_balance) = old_balance {
1901            if let Ok(balance_delta) = balance_delta {
1902                let mut new_balance = HashMap::new();
1903                for (key, value) in old_balance.iter() {
1904                    new_balance.insert(key.clone(), *value);
1905                }
1906                for (key, delta) in balance_delta.iter() {
1907                    let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1908                        balance: 0,
1909                        num_coins: 0,
1910                        address_balance: 0,
1911                    });
1912                    let new_total = TotalBalance {
1913                        balance: old.balance + delta.balance,
1914                        num_coins: old.num_coins + delta.num_coins,
1915                        address_balance: old.address_balance,
1916                    };
1917                    new_balance.insert(key.clone(), new_total);
1918                }
1919                Ok(Arc::new(new_balance))
1920            } else {
1921                balance_delta.clone()
1922            }
1923        } else {
1924            old_balance.clone()
1925        }
1926    }
1927}
1928
1929#[cfg(test)]
1930mod tests {
1931    use super::IndexStore;
1932    use super::ObjectIndexChanges;
1933    use move_core_types::account_address::AccountAddress;
1934    use prometheus::Registry;
1935    use std::collections::BTreeMap;
1936    use std::env::temp_dir;
1937    use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
1938    use sui_types::digests::TransactionDigest;
1939    use sui_types::effects::TransactionEvents;
1940    use sui_types::gas_coin::GAS;
1941    use sui_types::object;
1942    use sui_types::object::Owner;
1943
1944    #[tokio::test]
1945    async fn test_index_cache() -> anyhow::Result<()> {
1946        // This test is going to invoke `index_tx()`where 10 coins each with balance 100
1947        // are going to be added to an address. The balance is then going to be read from the db
1948        // and the cache. It should be 1000. Then, we are going to delete 3 of those coins from
1949        // the address and invoke `index_tx()` again and read balance. The balance should be 700
1950        // and verified from both db and cache.
1951        // This tests make sure we are invalidating entries in the cache and always reading latest
1952        // balance.
1953        let index_store =
1954            IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
1955        let address: SuiAddress = AccountAddress::random().into();
1956        let mut written_objects = BTreeMap::new();
1957        let mut input_objects = BTreeMap::new();
1958        let mut object_map = BTreeMap::new();
1959
1960        let mut new_objects = vec![];
1961        for _i in 0..10 {
1962            let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1963            new_objects.push((
1964                (address, object.id()),
1965                ObjectInfo {
1966                    object_id: object.id(),
1967                    version: object.version(),
1968                    digest: object.digest(),
1969                    type_: ObjectType::Struct(object.type_().unwrap().clone()),
1970                    owner: Owner::AddressOwner(address),
1971                    previous_transaction: object.previous_transaction,
1972                },
1973            ));
1974            object_map.insert(object.id(), object.clone());
1975            written_objects.insert(object.data.id(), object);
1976        }
1977        let object_index_changes = ObjectIndexChanges {
1978            deleted_owners: vec![],
1979            deleted_dynamic_fields: vec![],
1980            new_owners: new_objects,
1981            new_dynamic_fields: vec![],
1982        };
1983
1984        let tx_coins = (input_objects.clone(), written_objects.clone());
1985        index_store.index_tx(
1986            address,
1987            vec![].into_iter(),
1988            vec![].into_iter(),
1989            vec![].into_iter(),
1990            &TransactionEvents { data: vec![] },
1991            object_index_changes,
1992            &TransactionDigest::random(),
1993            1234,
1994            Some(tx_coins),
1995            vec![],
1996        )?;
1997
1998        let balance_from_db = IndexStore::get_balance_from_db(
1999            index_store.metrics.clone(),
2000            index_store.tables.coin_index_2.clone(),
2001            address,
2002            GAS::type_tag(),
2003        )?;
2004        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2005        assert_eq!(balance, balance_from_db);
2006        assert_eq!(balance.balance, 1000);
2007        assert_eq!(balance.num_coins, 10);
2008
2009        let all_balance = index_store.get_all_coin_object_balances(address)?;
2010        let balance = all_balance.get(&GAS::type_tag()).unwrap();
2011        assert_eq!(*balance, balance_from_db);
2012        assert_eq!(balance.balance, 1000);
2013        assert_eq!(balance.num_coins, 10);
2014
2015        written_objects.clear();
2016        let mut deleted_objects = vec![];
2017        for (id, object) in object_map.iter().take(3) {
2018            deleted_objects.push((address, *id));
2019            input_objects.insert(*id, object.to_owned());
2020        }
2021        let object_index_changes = ObjectIndexChanges {
2022            deleted_owners: deleted_objects.clone(),
2023            deleted_dynamic_fields: vec![],
2024            new_owners: vec![],
2025            new_dynamic_fields: vec![],
2026        };
2027        let tx_coins = (input_objects, written_objects);
2028        index_store.index_tx(
2029            address,
2030            vec![].into_iter(),
2031            vec![].into_iter(),
2032            vec![].into_iter(),
2033            &TransactionEvents { data: vec![] },
2034            object_index_changes,
2035            &TransactionDigest::random(),
2036            1234,
2037            Some(tx_coins),
2038            vec![],
2039        )?;
2040        let balance_from_db = IndexStore::get_balance_from_db(
2041            index_store.metrics.clone(),
2042            index_store.tables.coin_index_2.clone(),
2043            address,
2044            GAS::type_tag(),
2045        )?;
2046        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2047        assert_eq!(balance, balance_from_db);
2048        assert_eq!(balance.balance, 700);
2049        assert_eq!(balance.num_coins, 7);
2050        // Invalidate per coin type balance cache and read from all balance cache to ensure
2051        // the balance matches
2052        index_store
2053            .caches
2054            .per_coin_type_balance
2055            .invalidate(&(address, GAS::type_tag()));
2056        let all_balance = index_store.get_all_coin_object_balances(address)?;
2057        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2058        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2059        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2060        assert_eq!(balance, balance_from_db);
2061        assert_eq!(balance.balance, 700);
2062        assert_eq!(balance.num_coins, 7);
2063
2064        Ok(())
2065    }
2066
2067    #[tokio::test]
2068    async fn test_get_transaction_by_move_function() {
2069        use sui_types::base_types::ObjectID;
2070        use typed_store::Map;
2071
2072        let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2073        let db = &index_store.tables.transactions_by_move_function;
2074        db.insert(
2075            &(
2076                ObjectID::new([1; 32]),
2077                "mod".to_string(),
2078                "f".to_string(),
2079                0,
2080            ),
2081            &[0; 32].into(),
2082        )
2083        .unwrap();
2084        db.insert(
2085            &(
2086                ObjectID::new([1; 32]),
2087                "mod".to_string(),
2088                "Z".repeat(128),
2089                0,
2090            ),
2091            &[1; 32].into(),
2092        )
2093        .unwrap();
2094        db.insert(
2095            &(
2096                ObjectID::new([1; 32]),
2097                "mod".to_string(),
2098                "f".repeat(128),
2099                0,
2100            ),
2101            &[2; 32].into(),
2102        )
2103        .unwrap();
2104        db.insert(
2105            &(
2106                ObjectID::new([1; 32]),
2107                "mod".to_string(),
2108                "z".repeat(128),
2109                0,
2110            ),
2111            &[3; 32].into(),
2112        )
2113        .unwrap();
2114
2115        let mut v = index_store
2116            .get_transactions_by_move_function(
2117                ObjectID::new([1; 32]),
2118                Some("mod".to_string()),
2119                None,
2120                None,
2121                None,
2122                false,
2123            )
2124            .unwrap();
2125        let v_rev = index_store
2126            .get_transactions_by_move_function(
2127                ObjectID::new([1; 32]),
2128                Some("mod".to_string()),
2129                None,
2130                None,
2131                None,
2132                true,
2133            )
2134            .unwrap();
2135        v.reverse();
2136        assert_eq!(v, v_rev);
2137    }
2138}