sui_core/
jsonrpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! IndexStore supports creation of various ancillary indexes of state in SuiDataStore.
5//! The main user of this data is the explorer.
6
7use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18    IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19    register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use typed_store::TypedStoreError;
23use typed_store::rocksdb::compaction_filter::Decision;
24
25use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
26use sui_storage::mutex_table::MutexTable;
27use sui_storage::sharded_lru::ShardedLruCache;
28use sui_types::base_types::{
29    ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
30};
31use sui_types::base_types::{ObjectInfo, ObjectRef};
32use sui_types::digests::TransactionEventsDigest;
33use sui_types::dynamic_field::{self, DynamicFieldInfo};
34use sui_types::effects::TransactionEvents;
35use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
36use sui_types::inner_temporary_store::TxCoins;
37use sui_types::object::{Object, Owner};
38use sui_types::parse_sui_struct_tag;
39use sui_types::storage::error::Error as StorageError;
40use tracing::{debug, info, instrument, trace};
41use typed_store::DBMapUtils;
42use typed_store::rocks::{
43    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
44    read_size_from_env,
45};
46use typed_store::traits::Map;
47
48type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
49
50type OwnerIndexKey = (SuiAddress, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
57pub struct CoinIndexKey2 {
58    pub owner: SuiAddress,
59    pub coin_type: String,
60    // the balance of the coin inverted `!coin.balance` in order to force sorting of coins to be
61    // from greatest to least
62    pub inverted_balance: u64,
63    pub object_id: ObjectID,
64}
65
66impl CoinIndexKey2 {
67    pub fn new_from_cursor(
68        owner: SuiAddress,
69        coin_type: String,
70        inverted_balance: u64,
71        object_id: ObjectID,
72    ) -> Self {
73        Self {
74            owner,
75            coin_type,
76            inverted_balance,
77            object_id,
78        }
79    }
80
81    pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
82        Self {
83            owner,
84            coin_type,
85            inverted_balance: !balance,
86            object_id,
87        }
88    }
89}
90
91const CURRENT_DB_VERSION: u64 = 0;
92const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
93
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
95struct MetadataInfo {
96    /// Version of the Database
97    version: u64,
98    /// Version of each of the column families
99    ///
100    /// This is used to version individual column families to determine if a CF needs to be
101    /// (re)initialized on startup.
102    column_families: BTreeMap<String, ColumnFamilyInfo>,
103}
104
105#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106struct ColumnFamilyInfo {
107    version: u64,
108}
109
110pub const MAX_TX_RANGE_SIZE: u64 = 4096;
111
112pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
113const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
114const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
115const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
116
117#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
118pub struct TotalBalance {
119    pub balance: i128,
120    pub num_coins: i64,
121}
122
123#[derive(Debug)]
124pub struct ObjectIndexChanges {
125    pub deleted_owners: Vec<OwnerIndexKey>,
126    pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
127    pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
128    pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
129}
130
131#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
132pub struct CoinInfo {
133    pub version: SequenceNumber,
134    pub digest: ObjectDigest,
135    pub balance: u64,
136    pub previous_transaction: TransactionDigest,
137}
138
139impl CoinInfo {
140    pub fn from_object(object: &Object) -> Option<CoinInfo> {
141        object.as_coin_maybe().map(|coin| CoinInfo {
142            version: object.version(),
143            digest: object.digest(),
144            previous_transaction: object.previous_transaction,
145            balance: coin.value(),
146        })
147    }
148}
149
150pub struct IndexStoreMetrics {
151    balance_lookup_from_db: IntCounter,
152    balance_lookup_from_total: IntCounter,
153    all_balance_lookup_from_db: IntCounter,
154    all_balance_lookup_from_total: IntCounter,
155}
156
157impl IndexStoreMetrics {
158    pub fn new(registry: &Registry) -> IndexStoreMetrics {
159        Self {
160            balance_lookup_from_db: register_int_counter_with_registry!(
161                "balance_lookup_from_db",
162                "Total number of balance requests served from cache",
163                registry,
164            )
165            .unwrap(),
166            balance_lookup_from_total: register_int_counter_with_registry!(
167                "balance_lookup_from_total",
168                "Total number of balance requests served ",
169                registry,
170            )
171            .unwrap(),
172            all_balance_lookup_from_db: register_int_counter_with_registry!(
173                "all_balance_lookup_from_db",
174                "Total number of all balance requests served from cache",
175                registry,
176            )
177            .unwrap(),
178            all_balance_lookup_from_total: register_int_counter_with_registry!(
179                "all_balance_lookup_from_total",
180                "Total number of all balance requests served",
181                registry,
182            )
183            .unwrap(),
184        }
185    }
186}
187
188pub struct IndexStoreCaches {
189    per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
190    all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
191    pub locks: MutexTable<SuiAddress>,
192}
193
194#[derive(Default)]
195pub struct IndexStoreCacheUpdates {
196    _locks: Vec<OwnedMutexGuard<()>>,
197    per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
198    all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
199}
200
201#[derive(DBMapUtils)]
202pub struct IndexStoreTables {
203    /// A singleton that store metadata information on the DB.
204    ///
205    /// A few uses for this singleton:
206    /// - determining if the DB has been initialized (as some tables could still be empty post
207    ///   initialization)
208    /// - version of each column family and their respective initialization status
209    meta: DBMap<(), MetadataInfo>,
210
211    /// Index from sui address to transactions initiated by that address.
212    transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
213
214    /// Index from sui address to transactions that were sent to that address.
215    transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
216
217    /// Index from object id to transactions that used that object id as input.
218    #[deprecated]
219    transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
220
221    /// Index from object id to transactions that modified/created that object id.
222    #[deprecated]
223    transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
224
225    /// Index from package id, module and function identifier to transactions that used that moce function call as input.
226    transactions_by_move_function:
227        DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
228
229    /// Ordering of all indexed transactions.
230    transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
231
232    /// Index from transaction digest to sequence number.
233    transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
234
235    /// This is an index of object references to currently existing objects, indexed by the
236    /// composite key of the SuiAddress of their owner and the object ID of the object.
237    /// This composite index allows an efficient iterator to list all objected currently owned
238    /// by a specific user, and their object reference.
239    owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
240
241    coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
242
243    /// This is an index of object references to currently existing dynamic field object, indexed by the
244    /// composite key of the object ID of their parent and the object ID of the dynamic field object.
245    /// This composite index allows an efficient iterator to list all objects currently owned
246    /// by a specific object, and their object reference.
247    dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
248
249    event_order: DBMap<EventId, EventIndex>,
250    event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
251    event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
252    event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
253    event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
254    event_by_time: DBMap<(u64, EventId), EventIndex>,
255
256    pruner_watermark: DBMap<(), TxSequenceNumber>,
257}
258
259impl IndexStoreTables {
260    pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
261        &self.owner_index
262    }
263
264    pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
265        &self.coin_index_2
266    }
267
268    #[allow(deprecated)]
269    fn init(&mut self) -> Result<(), StorageError> {
270        let metadata = {
271            match self.meta.get(&()) {
272                Ok(Some(metadata)) => metadata,
273                Ok(None) | Err(_) => MetadataInfo {
274                    version: CURRENT_DB_VERSION,
275                    column_families: BTreeMap::new(),
276                },
277            }
278        };
279
280        // Commit to the DB that the indexes have been initialized
281        self.meta.insert(&(), &metadata)?;
282
283        Ok(())
284    }
285
286    pub fn get_dynamic_fields_iterator(
287        &self,
288        object: ObjectID,
289        cursor: Option<ObjectID>,
290    ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
291        debug!(?object, "get_dynamic_fields");
292        // The object id 0 is the smallest possible
293        let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
294        let iter_upper_bound = (object, ObjectID::MAX);
295        self.dynamic_field_index
296            .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
297            // skip an extra b/c the cursor is exclusive
298            .skip(usize::from(cursor.is_some()))
299            .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
300            .map_ok(|((_, c), object_info)| (c, object_info))
301    }
302}
303
304pub struct IndexStore {
305    next_sequence_number: AtomicU64,
306    tables: IndexStoreTables,
307    pub caches: IndexStoreCaches,
308    metrics: Arc<IndexStoreMetrics>,
309    max_type_length: u64,
310    remove_deprecated_tables: bool,
311    pruner_watermark: Arc<AtomicU64>,
312}
313
314struct JsonRpcCompactionMetrics {
315    key_removed: IntCounterVec,
316    key_kept: IntCounterVec,
317    key_error: IntCounterVec,
318}
319
320impl JsonRpcCompactionMetrics {
321    pub fn new(registry: &Registry) -> Arc<Self> {
322        Arc::new(Self {
323            key_removed: register_int_counter_vec_with_registry!(
324                "json_rpc_compaction_filter_key_removed",
325                "Compaction key removed",
326                &["cf"],
327                registry
328            )
329            .unwrap(),
330            key_kept: register_int_counter_vec_with_registry!(
331                "json_rpc_compaction_filter_key_kept",
332                "Compaction key kept",
333                &["cf"],
334                registry
335            )
336            .unwrap(),
337            key_error: register_int_counter_vec_with_registry!(
338                "json_rpc_compaction_filter_key_error",
339                "Compaction error",
340                &["cf"],
341                registry
342            )
343            .unwrap(),
344        })
345    }
346}
347
348fn compaction_filter_config<T: DeserializeOwned>(
349    name: &str,
350    metrics: Arc<JsonRpcCompactionMetrics>,
351    mut db_options: DBOptions,
352    pruner_watermark: Arc<AtomicU64>,
353    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
354    by_key: bool,
355) -> DBOptions {
356    let cf = name.to_string();
357    db_options
358        .options
359        .set_compaction_filter(name, move |_, key, value| {
360            let bytes = if by_key { key } else { value };
361            let deserializer = bincode::DefaultOptions::new()
362                .with_big_endian()
363                .with_fixint_encoding();
364            match deserializer.deserialize(bytes) {
365                Ok(key_data) => {
366                    let sequence_number = extractor(key_data);
367                    if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
368                        metrics.key_removed.with_label_values(&[&cf]).inc();
369                        Decision::Remove
370                    } else {
371                        metrics.key_kept.with_label_values(&[&cf]).inc();
372                        Decision::Keep
373                    }
374                }
375                Err(_) => {
376                    metrics.key_error.with_label_values(&[&cf]).inc();
377                    Decision::Keep
378                }
379            }
380        });
381    db_options
382}
383
384fn compaction_filter_config_by_key<T: DeserializeOwned>(
385    name: &str,
386    metrics: Arc<JsonRpcCompactionMetrics>,
387    db_options: DBOptions,
388    pruner_watermark: Arc<AtomicU64>,
389    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
390) -> DBOptions {
391    compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
392}
393
394fn coin_index_table_default_config() -> DBOptions {
395    default_db_options()
396        .optimize_for_write_throughput()
397        .optimize_for_read(
398            read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
399        )
400        .disable_write_throttling()
401}
402
403impl IndexStore {
404    pub fn new_without_init(
405        path: PathBuf,
406        registry: &Registry,
407        max_type_length: Option<u64>,
408        remove_deprecated_tables: bool,
409    ) -> Self {
410        let db_options = default_db_options().disable_write_throttling();
411        let pruner_watermark = Arc::new(AtomicU64::new(0));
412        let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
413        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
414            (
415                "transactions_from_addr".to_string(),
416                compaction_filter_config_by_key(
417                    "transactions_from_addr",
418                    compaction_metrics.clone(),
419                    db_options.clone(),
420                    pruner_watermark.clone(),
421                    |(_, id): (SuiAddress, TxSequenceNumber)| id,
422                ),
423            ),
424            (
425                "transactions_to_addr".to_string(),
426                compaction_filter_config_by_key(
427                    "transactions_to_addr",
428                    compaction_metrics.clone(),
429                    db_options.clone(),
430                    pruner_watermark.clone(),
431                    |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
432                ),
433            ),
434            (
435                "transactions_by_move_function".to_string(),
436                compaction_filter_config_by_key(
437                    "transactions_by_move_function",
438                    compaction_metrics.clone(),
439                    db_options.clone(),
440                    pruner_watermark.clone(),
441                    |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
442                ),
443            ),
444            (
445                "transaction_order".to_string(),
446                compaction_filter_config_by_key(
447                    "transaction_order",
448                    compaction_metrics.clone(),
449                    db_options.clone(),
450                    pruner_watermark.clone(),
451                    |sequence_number: TxSequenceNumber| sequence_number,
452                ),
453            ),
454            (
455                "transactions_seq".to_string(),
456                compaction_filter_config(
457                    "transactions_seq",
458                    compaction_metrics.clone(),
459                    db_options.clone(),
460                    pruner_watermark.clone(),
461                    |sequence_number: TxSequenceNumber| sequence_number,
462                    false,
463                ),
464            ),
465            (
466                "coin_index_2".to_string(),
467                coin_index_table_default_config(),
468            ),
469            (
470                "event_order".to_string(),
471                compaction_filter_config_by_key(
472                    "event_order",
473                    compaction_metrics.clone(),
474                    db_options.clone(),
475                    pruner_watermark.clone(),
476                    |event_id: EventId| event_id.0,
477                ),
478            ),
479            (
480                "event_by_move_module".to_string(),
481                compaction_filter_config_by_key(
482                    "event_by_move_module",
483                    compaction_metrics.clone(),
484                    db_options.clone(),
485                    pruner_watermark.clone(),
486                    |(_, event_id): (ModuleId, EventId)| event_id.0,
487                ),
488            ),
489            (
490                "event_by_event_module".to_string(),
491                compaction_filter_config_by_key(
492                    "event_by_event_module",
493                    compaction_metrics.clone(),
494                    db_options.clone(),
495                    pruner_watermark.clone(),
496                    |(_, event_id): (ModuleId, EventId)| event_id.0,
497                ),
498            ),
499            (
500                "event_by_sender".to_string(),
501                compaction_filter_config_by_key(
502                    "event_by_sender",
503                    compaction_metrics.clone(),
504                    db_options.clone(),
505                    pruner_watermark.clone(),
506                    |(_, event_id): (SuiAddress, EventId)| event_id.0,
507                ),
508            ),
509            (
510                "event_by_time".to_string(),
511                compaction_filter_config_by_key(
512                    "event_by_time",
513                    compaction_metrics.clone(),
514                    db_options.clone(),
515                    pruner_watermark.clone(),
516                    |(_, event_id): (u64, EventId)| event_id.0,
517                ),
518            ),
519        ]));
520        let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
521            path,
522            MetricConf::new("index"),
523            Some(db_options.options),
524            Some(table_options),
525            remove_deprecated_tables,
526        );
527
528        let metrics = IndexStoreMetrics::new(registry);
529        let caches = IndexStoreCaches {
530            per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
531            all_balances: ShardedLruCache::new(1_000_000, 1000),
532            locks: MutexTable::new(128),
533        };
534        let next_sequence_number = tables
535            .transaction_order
536            .reversed_safe_iter_with_bounds(None, None)
537            .expect("failed to initialize indexes")
538            .next()
539            .transpose()
540            .expect("failed to initialize indexes")
541            .map(|(seq, _)| seq + 1)
542            .unwrap_or(0)
543            .into();
544        let pruner_watermark_value = tables
545            .pruner_watermark
546            .get(&())
547            .expect("failed to initialize index tables")
548            .unwrap_or(0);
549        pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
550
551        Self {
552            tables,
553            next_sequence_number,
554            caches,
555            metrics: Arc::new(metrics),
556            max_type_length: max_type_length.unwrap_or(128),
557            remove_deprecated_tables,
558            pruner_watermark,
559        }
560    }
561
562    pub fn new(
563        path: PathBuf,
564        registry: &Registry,
565        max_type_length: Option<u64>,
566        remove_deprecated_tables: bool,
567    ) -> Self {
568        let mut store =
569            Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
570        store.tables.init().unwrap();
571        store
572    }
573
574    pub fn tables(&self) -> &IndexStoreTables {
575        &self.tables
576    }
577
578    #[instrument(skip_all)]
579    pub fn index_coin(
580        &self,
581        digest: &TransactionDigest,
582        batch: &mut DBBatch,
583        object_index_changes: &ObjectIndexChanges,
584        tx_coins: Option<TxCoins>,
585    ) -> SuiResult<IndexStoreCacheUpdates> {
586        // In production if this code path is hit, we should expect `tx_coins` to not be None.
587        // However, in many tests today we do not distinguish validator and/or fullnode, so
588        // we gracefully exist here.
589        if tx_coins.is_none() {
590            return Ok(IndexStoreCacheUpdates::default());
591        }
592        // Acquire locks on changed coin owners
593        let mut addresses: HashSet<SuiAddress> = HashSet::new();
594        addresses.extend(
595            object_index_changes
596                .deleted_owners
597                .iter()
598                .map(|(owner, _)| *owner),
599        );
600        addresses.extend(
601            object_index_changes
602                .new_owners
603                .iter()
604                .map(|((owner, _), _)| *owner),
605        );
606        let _locks = self.caches.locks.acquire_locks(addresses.into_iter());
607        let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
608            HashMap::new();
609        // Index coin info
610        let (input_coins, written_coins) = tx_coins.unwrap();
611
612        // 1. Remove old coins from the DB by looking at the set of input coin objects
613        let coin_delete_keys = input_coins
614            .values()
615            .filter_map(|object| {
616                // only process address owned coins
617                let Owner::AddressOwner(owner) = object.owner() else {
618                    return None;
619                };
620
621                // only process coin types
622                let (coin_type, coin) = object
623                    .coin_type_maybe()
624                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
625
626                let key = CoinIndexKey2::new(
627                    *owner,
628                    coin_type.to_string(),
629                    coin.balance.value(),
630                    object.id(),
631                );
632
633                let map = balance_changes.entry(*owner).or_default();
634                let entry = map.entry(coin_type).or_insert(TotalBalance {
635                    num_coins: 0,
636                    balance: 0,
637                });
638                entry.num_coins -= 1;
639                entry.balance -= coin.balance.value() as i128;
640
641                Some(key)
642            })
643            .collect::<Vec<_>>();
644        trace!(
645            tx_digset=?digest,
646            "coin_delete_keys: {:?}",
647            coin_delete_keys,
648        );
649        batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
650
651        // 2. Insert new coins, or new versions of coins, by looking at `written_coins`.
652        let coin_add_keys = written_coins
653            .values()
654            .filter_map(|object| {
655                // only process address owned coins
656                let Owner::AddressOwner(owner) = object.owner() else {
657                    return None;
658                };
659
660                // only process coin types
661                let (coin_type, coin) = object
662                    .coin_type_maybe()
663                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
664
665                let key = CoinIndexKey2::new(
666                    *owner,
667                    coin_type.to_string(),
668                    coin.balance.value(),
669                    object.id(),
670                );
671                let value = CoinInfo {
672                    version: object.version(),
673                    digest: object.digest(),
674                    balance: coin.balance.value(),
675                    previous_transaction: object.previous_transaction,
676                };
677                let map = balance_changes.entry(*owner).or_default();
678                let entry = map.entry(coin_type).or_insert(TotalBalance {
679                    num_coins: 0,
680                    balance: 0,
681                });
682                entry.num_coins += 1;
683                entry.balance += coin.balance.value() as i128;
684
685                Some((key, value))
686            })
687            .collect::<Vec<_>>();
688        trace!(
689            tx_digset=?digest,
690            "coin_add_keys: {:?}",
691            coin_add_keys,
692        );
693
694        batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
695
696        let per_coin_type_balance_changes: Vec<_> = balance_changes
697            .iter()
698            .flat_map(|(address, balance_map)| {
699                balance_map.iter().map(|(type_tag, balance)| {
700                    (
701                        (*address, type_tag.clone()),
702                        Ok::<TotalBalance, SuiError>(*balance),
703                    )
704                })
705            })
706            .collect();
707        let all_balance_changes: Vec<_> = balance_changes
708            .into_iter()
709            .map(|(address, balance_map)| {
710                (
711                    address,
712                    Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
713                )
714            })
715            .collect();
716        let cache_updates = IndexStoreCacheUpdates {
717            _locks,
718            per_coin_type_balance_changes,
719            all_balance_changes,
720        };
721        Ok(cache_updates)
722    }
723
724    #[instrument(skip_all)]
725    pub fn index_tx(
726        &self,
727        sender: SuiAddress,
728        active_inputs: impl Iterator<Item = ObjectID>,
729        mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
730        move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
731        events: &TransactionEvents,
732        object_index_changes: ObjectIndexChanges,
733        digest: &TransactionDigest,
734        timestamp_ms: u64,
735        tx_coins: Option<TxCoins>,
736    ) -> SuiResult<u64> {
737        let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
738        let mut batch = self.tables.transactions_from_addr.batch();
739
740        batch.insert_batch(
741            &self.tables.transaction_order,
742            std::iter::once((sequence, *digest)),
743        )?;
744
745        batch.insert_batch(
746            &self.tables.transactions_seq,
747            std::iter::once((*digest, sequence)),
748        )?;
749
750        batch.insert_batch(
751            &self.tables.transactions_from_addr,
752            std::iter::once(((sender, sequence), *digest)),
753        )?;
754
755        #[allow(deprecated)]
756        if !self.remove_deprecated_tables {
757            batch.insert_batch(
758                &self.tables.transactions_by_input_object_id,
759                active_inputs.map(|id| ((id, sequence), *digest)),
760            )?;
761
762            batch.insert_batch(
763                &self.tables.transactions_by_mutated_object_id,
764                mutated_objects
765                    .clone()
766                    .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
767            )?;
768        }
769
770        batch.insert_batch(
771            &self.tables.transactions_by_move_function,
772            move_functions
773                .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
774        )?;
775
776        batch.insert_batch(
777            &self.tables.transactions_to_addr,
778            mutated_objects.filter_map(|(_, owner)| {
779                owner
780                    .get_address_owner_address()
781                    .ok()
782                    .map(|addr| ((addr, sequence), digest))
783            }),
784        )?;
785
786        // Coin Index
787        let cache_updates = self.index_coin(digest, &mut batch, &object_index_changes, tx_coins)?;
788
789        // Owner index
790        batch.delete_batch(
791            &self.tables.owner_index,
792            object_index_changes.deleted_owners.into_iter(),
793        )?;
794        batch.delete_batch(
795            &self.tables.dynamic_field_index,
796            object_index_changes.deleted_dynamic_fields.into_iter(),
797        )?;
798
799        batch.insert_batch(
800            &self.tables.owner_index,
801            object_index_changes.new_owners.into_iter(),
802        )?;
803
804        batch.insert_batch(
805            &self.tables.dynamic_field_index,
806            object_index_changes.new_dynamic_fields.into_iter(),
807        )?;
808
809        // events
810        let event_digest = events.digest();
811        batch.insert_batch(
812            &self.tables.event_order,
813            events
814                .data
815                .iter()
816                .enumerate()
817                .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
818        )?;
819        batch.insert_batch(
820            &self.tables.event_by_move_module,
821            events
822                .data
823                .iter()
824                .enumerate()
825                .map(|(i, e)| {
826                    (
827                        i,
828                        ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
829                    )
830                })
831                .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
832        )?;
833        batch.insert_batch(
834            &self.tables.event_by_sender,
835            events.data.iter().enumerate().map(|(i, e)| {
836                (
837                    (e.sender, (sequence, i)),
838                    (event_digest, *digest, timestamp_ms),
839                )
840            }),
841        )?;
842        batch.insert_batch(
843            &self.tables.event_by_move_event,
844            events.data.iter().enumerate().map(|(i, e)| {
845                (
846                    (e.type_.clone(), (sequence, i)),
847                    (event_digest, *digest, timestamp_ms),
848                )
849            }),
850        )?;
851
852        batch.insert_batch(
853            &self.tables.event_by_time,
854            events.data.iter().enumerate().map(|(i, _)| {
855                (
856                    (timestamp_ms, (sequence, i)),
857                    (event_digest, *digest, timestamp_ms),
858                )
859            }),
860        )?;
861
862        batch.insert_batch(
863            &self.tables.event_by_event_module,
864            events.data.iter().enumerate().map(|(i, e)| {
865                (
866                    (
867                        ModuleId::new(e.type_.address, e.type_.module.clone()),
868                        (sequence, i),
869                    ),
870                    (event_digest, *digest, timestamp_ms),
871                )
872            }),
873        )?;
874
875        let invalidate_caches =
876            read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
877
878        if invalidate_caches {
879            // Invalidate cache before writing to db so we always serve latest values
880            self.invalidate_per_coin_type_cache(
881                cache_updates
882                    .per_coin_type_balance_changes
883                    .iter()
884                    .map(|x| x.0.clone()),
885            )?;
886            self.invalidate_all_balance_cache(
887                cache_updates.all_balance_changes.iter().map(|x| x.0),
888            )?;
889        }
890
891        batch.write()?;
892
893        if !invalidate_caches {
894            // We cannot update the cache before updating the db or else on failing to write to db
895            // we will update the cache twice). However, this only means cache is eventually consistent with
896            // the db (within a very short delay)
897            self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)?;
898            self.update_all_balance_cache(cache_updates.all_balance_changes)?;
899        }
900        Ok(sequence)
901    }
902
903    pub fn next_sequence_number(&self) -> TxSequenceNumber {
904        self.next_sequence_number.load(Ordering::SeqCst) + 1
905    }
906
907    #[instrument(skip(self))]
908    pub fn get_transactions(
909        &self,
910        filter: Option<TransactionFilter>,
911        cursor: Option<TransactionDigest>,
912        limit: Option<usize>,
913        reverse: bool,
914    ) -> SuiResult<Vec<TransactionDigest>> {
915        // Lookup TransactionDigest sequence number,
916        let cursor = if let Some(cursor) = cursor {
917            Some(
918                self.get_transaction_seq(&cursor)?
919                    .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
920            )
921        } else {
922            None
923        };
924        match filter {
925            Some(TransactionFilter::MoveFunction {
926                package,
927                module,
928                function,
929            }) => Ok(self.get_transactions_by_move_function(
930                package, module, function, cursor, limit, reverse,
931            )?),
932            Some(TransactionFilter::InputObject(object_id)) => {
933                Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
934            }
935            Some(TransactionFilter::ChangedObject(object_id)) => {
936                Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
937            }
938            Some(TransactionFilter::FromAddress(address)) => {
939                Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
940            }
941            Some(TransactionFilter::ToAddress(address)) => {
942                Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
943            }
944            // NOTE: filter via checkpoint sequence number is implemented in
945            // `get_transactions` of authority.rs.
946            Some(_) => Err(SuiErrorKind::UserInputError {
947                error: UserInputError::Unsupported(format!("{:?}", filter)),
948            }
949            .into()),
950            None => {
951                if reverse {
952                    let iter = self
953                        .tables
954                        .transaction_order
955                        .reversed_safe_iter_with_bounds(
956                            None,
957                            Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
958                        )?
959                        .skip(usize::from(cursor.is_some()))
960                        .map(|result| result.map(|(_, digest)| digest));
961                    if let Some(limit) = limit {
962                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
963                    } else {
964                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
965                    }
966                } else {
967                    let iter = self
968                        .tables
969                        .transaction_order
970                        .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
971                        .skip(usize::from(cursor.is_some()))
972                        .map(|result| result.map(|(_, digest)| digest));
973                    if let Some(limit) = limit {
974                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
975                    } else {
976                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
977                    }
978                }
979            }
980        }
981    }
982
983    #[instrument(skip_all)]
984    fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
985        index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
986        key: KeyT,
987        cursor: Option<TxSequenceNumber>,
988        limit: Option<usize>,
989        reverse: bool,
990    ) -> SuiResult<Vec<TransactionDigest>> {
991        Ok(if reverse {
992            let iter = index
993                .reversed_safe_iter_with_bounds(
994                    None,
995                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
996                )?
997                // skip one more if exclusive cursor is Some
998                .skip(usize::from(cursor.is_some()))
999                .take_while(|result| {
1000                    result
1001                        .as_ref()
1002                        .map(|((id, _), _)| *id == key)
1003                        .unwrap_or(false)
1004                })
1005                .map(|result| result.map(|(_, digest)| digest));
1006            if let Some(limit) = limit {
1007                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1008            } else {
1009                iter.collect::<Result<Vec<_>, _>>()?
1010            }
1011        } else {
1012            let iter = index
1013                .safe_iter_with_bounds(
1014                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1015                    None,
1016                )
1017                // skip one more if exclusive cursor is Some
1018                .skip(usize::from(cursor.is_some()))
1019                .map(|result| result.expect("iterator db error"))
1020                .take_while(|((id, _), _)| *id == key)
1021                .map(|(_, digest)| digest);
1022            if let Some(limit) = limit {
1023                iter.take(limit).collect()
1024            } else {
1025                iter.collect()
1026            }
1027        })
1028    }
1029
1030    #[instrument(skip(self))]
1031    pub fn get_transactions_by_input_object(
1032        &self,
1033        input_object: ObjectID,
1034        cursor: Option<TxSequenceNumber>,
1035        limit: Option<usize>,
1036        reverse: bool,
1037    ) -> SuiResult<Vec<TransactionDigest>> {
1038        if self.remove_deprecated_tables {
1039            return Ok(vec![]);
1040        }
1041        #[allow(deprecated)]
1042        Self::get_transactions_from_index(
1043            &self.tables.transactions_by_input_object_id,
1044            input_object,
1045            cursor,
1046            limit,
1047            reverse,
1048        )
1049    }
1050
1051    #[instrument(skip(self))]
1052    pub fn get_transactions_by_mutated_object(
1053        &self,
1054        mutated_object: ObjectID,
1055        cursor: Option<TxSequenceNumber>,
1056        limit: Option<usize>,
1057        reverse: bool,
1058    ) -> SuiResult<Vec<TransactionDigest>> {
1059        if self.remove_deprecated_tables {
1060            return Ok(vec![]);
1061        }
1062        #[allow(deprecated)]
1063        Self::get_transactions_from_index(
1064            &self.tables.transactions_by_mutated_object_id,
1065            mutated_object,
1066            cursor,
1067            limit,
1068            reverse,
1069        )
1070    }
1071
1072    #[instrument(skip(self))]
1073    pub fn get_transactions_from_addr(
1074        &self,
1075        addr: SuiAddress,
1076        cursor: Option<TxSequenceNumber>,
1077        limit: Option<usize>,
1078        reverse: bool,
1079    ) -> SuiResult<Vec<TransactionDigest>> {
1080        Self::get_transactions_from_index(
1081            &self.tables.transactions_from_addr,
1082            addr,
1083            cursor,
1084            limit,
1085            reverse,
1086        )
1087    }
1088
1089    #[instrument(skip(self))]
1090    pub fn get_transactions_by_move_function(
1091        &self,
1092        package: ObjectID,
1093        module: Option<String>,
1094        function: Option<String>,
1095        cursor: Option<TxSequenceNumber>,
1096        limit: Option<usize>,
1097        reverse: bool,
1098    ) -> SuiResult<Vec<TransactionDigest>> {
1099        // If we are passed a function with no module return a UserInputError
1100        if function.is_some() && module.is_none() {
1101            return Err(SuiErrorKind::UserInputError {
1102                error: UserInputError::MoveFunctionInputError(
1103                    "Cannot supply function without supplying module".to_string(),
1104                ),
1105            }
1106            .into());
1107        }
1108
1109        // We cannot have a cursor without filling out the other keys.
1110        if cursor.is_some() && (module.is_none() || function.is_none()) {
1111            return Err(SuiErrorKind::UserInputError {
1112                error: UserInputError::MoveFunctionInputError(
1113                    "Cannot supply cursor without supplying module and function".to_string(),
1114                ),
1115            }
1116            .into());
1117        }
1118
1119        let cursor_val = cursor.unwrap_or(if reverse {
1120            TxSequenceNumber::MAX
1121        } else {
1122            TxSequenceNumber::MIN
1123        });
1124
1125        let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1126        let module_val = module.clone().unwrap_or(if reverse {
1127            max_string.clone()
1128        } else {
1129            "".to_string()
1130        });
1131
1132        let function_val =
1133            function
1134                .clone()
1135                .unwrap_or(if reverse { max_string } else { "".to_string() });
1136
1137        let key = (package, module_val, function_val, cursor_val);
1138        Ok(if reverse {
1139            let iter = self
1140                .tables
1141                .transactions_by_move_function
1142                .reversed_safe_iter_with_bounds(None, Some(key))?
1143                // skip one more if exclusive cursor is Some
1144                .skip(usize::from(cursor.is_some()))
1145                .take_while(|result| {
1146                    result
1147                        .as_ref()
1148                        .map(|((id, m, f, _), _)| {
1149                            *id == package
1150                                && module.as_ref().map(|x| x == m).unwrap_or(true)
1151                                && function.as_ref().map(|x| x == f).unwrap_or(true)
1152                        })
1153                        .unwrap_or(false)
1154                })
1155                .map(|result| result.map(|(_, digest)| digest));
1156            if let Some(limit) = limit {
1157                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1158            } else {
1159                iter.collect::<Result<Vec<_>, _>>()?
1160            }
1161        } else {
1162            let iter = self
1163                .tables
1164                .transactions_by_move_function
1165                .safe_iter_with_bounds(Some(key), None)
1166                .map(|result| result.expect("iterator db error"))
1167                // skip one more if exclusive cursor is Some
1168                .skip(usize::from(cursor.is_some()))
1169                .take_while(|((id, m, f, _), _)| {
1170                    *id == package
1171                        && module.as_ref().map(|x| x == m).unwrap_or(true)
1172                        && function.as_ref().map(|x| x == f).unwrap_or(true)
1173                })
1174                .map(|(_, digest)| digest);
1175            if let Some(limit) = limit {
1176                iter.take(limit).collect()
1177            } else {
1178                iter.collect()
1179            }
1180        })
1181    }
1182
1183    #[instrument(skip(self))]
1184    pub fn get_transactions_to_addr(
1185        &self,
1186        addr: SuiAddress,
1187        cursor: Option<TxSequenceNumber>,
1188        limit: Option<usize>,
1189        reverse: bool,
1190    ) -> SuiResult<Vec<TransactionDigest>> {
1191        Self::get_transactions_from_index(
1192            &self.tables.transactions_to_addr,
1193            addr,
1194            cursor,
1195            limit,
1196            reverse,
1197        )
1198    }
1199
1200    #[instrument(skip(self))]
1201    pub fn get_transaction_seq(
1202        &self,
1203        digest: &TransactionDigest,
1204    ) -> SuiResult<Option<TxSequenceNumber>> {
1205        Ok(self.tables.transactions_seq.get(digest)?)
1206    }
1207
1208    #[instrument(skip(self))]
1209    pub fn all_events(
1210        &self,
1211        tx_seq: TxSequenceNumber,
1212        event_seq: usize,
1213        limit: usize,
1214        descending: bool,
1215    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1216        Ok(if descending {
1217            self.tables
1218                .event_order
1219                .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1220                .take(limit)
1221                .map(|result| {
1222                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1223                        (digest, tx_digest, event_seq, time)
1224                    })
1225                })
1226                .collect::<Result<Vec<_>, _>>()?
1227        } else {
1228            self.tables
1229                .event_order
1230                .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1231                .take(limit)
1232                .map(|result| {
1233                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1234                        (digest, tx_digest, event_seq, time)
1235                    })
1236                })
1237                .collect::<Result<Vec<_>, _>>()?
1238        })
1239    }
1240
1241    #[instrument(skip(self))]
1242    pub fn events_by_transaction(
1243        &self,
1244        digest: &TransactionDigest,
1245        tx_seq: TxSequenceNumber,
1246        event_seq: usize,
1247        limit: usize,
1248        descending: bool,
1249    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1250        let seq = self
1251            .get_transaction_seq(digest)?
1252            .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1253        Ok(if descending {
1254            self.tables
1255                .event_order
1256                .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1257                .take_while(|result| {
1258                    result
1259                        .as_ref()
1260                        .map(|((tx, _), _)| tx == &seq)
1261                        .unwrap_or(false)
1262                })
1263                .take(limit)
1264                .map(|result| {
1265                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1266                        (digest, tx_digest, event_seq, time)
1267                    })
1268                })
1269                .collect::<Result<Vec<_>, _>>()?
1270        } else {
1271            self.tables
1272                .event_order
1273                .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1274                .map(|result| result.expect("iterator db error"))
1275                .take_while(|((tx, _), _)| tx == &seq)
1276                .take(limit)
1277                .map(|((_, event_seq), (digest, tx_digest, time))| {
1278                    (digest, tx_digest, event_seq, time)
1279                })
1280                .collect()
1281        })
1282    }
1283
1284    #[instrument(skip_all)]
1285    fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1286        index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1287        key: &KeyT,
1288        tx_seq: TxSequenceNumber,
1289        event_seq: usize,
1290        limit: usize,
1291        descending: bool,
1292    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1293        Ok(if descending {
1294            index
1295                .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1296                .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1297                .take(limit)
1298                .map(|result| {
1299                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1300                        (digest, tx_digest, event_seq, time)
1301                    })
1302                })
1303                .collect::<Result<Vec<_>, _>>()?
1304        } else {
1305            index
1306                .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1307                .map(|result| result.expect("iterator db error"))
1308                .take_while(|((m, _), _)| m == key)
1309                .take(limit)
1310                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1311                    (digest, tx_digest, event_seq, time)
1312                })
1313                .collect()
1314        })
1315    }
1316
1317    #[instrument(skip(self))]
1318    pub fn events_by_module_id(
1319        &self,
1320        module: &ModuleId,
1321        tx_seq: TxSequenceNumber,
1322        event_seq: usize,
1323        limit: usize,
1324        descending: bool,
1325    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1326        Self::get_event_from_index(
1327            &self.tables.event_by_move_module,
1328            module,
1329            tx_seq,
1330            event_seq,
1331            limit,
1332            descending,
1333        )
1334    }
1335
1336    #[instrument(skip(self))]
1337    pub fn events_by_move_event_struct_name(
1338        &self,
1339        struct_name: &StructTag,
1340        tx_seq: TxSequenceNumber,
1341        event_seq: usize,
1342        limit: usize,
1343        descending: bool,
1344    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1345        Self::get_event_from_index(
1346            &self.tables.event_by_move_event,
1347            struct_name,
1348            tx_seq,
1349            event_seq,
1350            limit,
1351            descending,
1352        )
1353    }
1354
1355    #[instrument(skip(self))]
1356    pub fn events_by_move_event_module(
1357        &self,
1358        module_id: &ModuleId,
1359        tx_seq: TxSequenceNumber,
1360        event_seq: usize,
1361        limit: usize,
1362        descending: bool,
1363    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1364        Self::get_event_from_index(
1365            &self.tables.event_by_event_module,
1366            module_id,
1367            tx_seq,
1368            event_seq,
1369            limit,
1370            descending,
1371        )
1372    }
1373
1374    #[instrument(skip(self))]
1375    pub fn events_by_sender(
1376        &self,
1377        sender: &SuiAddress,
1378        tx_seq: TxSequenceNumber,
1379        event_seq: usize,
1380        limit: usize,
1381        descending: bool,
1382    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1383        Self::get_event_from_index(
1384            &self.tables.event_by_sender,
1385            sender,
1386            tx_seq,
1387            event_seq,
1388            limit,
1389            descending,
1390        )
1391    }
1392
1393    #[instrument(skip(self))]
1394    pub fn event_iterator(
1395        &self,
1396        start_time: u64,
1397        end_time: u64,
1398        tx_seq: TxSequenceNumber,
1399        event_seq: usize,
1400        limit: usize,
1401        descending: bool,
1402    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1403        Ok(if descending {
1404            self.tables
1405                .event_by_time
1406                .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1407                .take_while(|result| {
1408                    result
1409                        .as_ref()
1410                        .map(|((m, _), _)| m >= &start_time)
1411                        .unwrap_or(false)
1412                })
1413                .take(limit)
1414                .map(|result| {
1415                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1416                        (digest, tx_digest, event_seq, time)
1417                    })
1418                })
1419                .collect::<Result<Vec<_>, _>>()?
1420        } else {
1421            self.tables
1422                .event_by_time
1423                .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1424                .map(|result| result.expect("iterator db error"))
1425                .take_while(|((m, _), _)| m <= &end_time)
1426                .take(limit)
1427                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1428                    (digest, tx_digest, event_seq, time)
1429                })
1430                .collect()
1431        })
1432    }
1433
1434    pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1435        match self
1436            .tables
1437            .event_by_time
1438            .reversed_safe_iter_with_bounds(
1439                None,
1440                Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1441            )?
1442            .next()
1443            .transpose()?
1444        {
1445            Some(((_, (watermark, _)), _)) => {
1446                if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1447                    info!(
1448                        "json rpc index pruning. Watermark is {} with digest {}",
1449                        watermark, digest
1450                    );
1451                }
1452                self.pruner_watermark.store(watermark, Ordering::Relaxed);
1453                self.tables.pruner_watermark.insert(&(), &watermark)?;
1454                Ok(watermark)
1455            }
1456            None => Ok(0),
1457        }
1458    }
1459
1460    pub fn get_dynamic_fields_iterator(
1461        &self,
1462        object: ObjectID,
1463        cursor: Option<ObjectID>,
1464    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1465    {
1466        Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1467    }
1468
1469    #[instrument(skip(self))]
1470    pub fn get_dynamic_field_object_id(
1471        &self,
1472        object: ObjectID,
1473        name_type: TypeTag,
1474        name_bcs_bytes: &[u8],
1475    ) -> SuiResult<Option<ObjectID>> {
1476        debug!(?object, "get_dynamic_field_object_id");
1477        let dynamic_field_id =
1478            dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1479                |e| {
1480                    SuiErrorKind::Unknown(format!(
1481                        "Unable to generate dynamic field id. Got error: {e:?}"
1482                    ))
1483                },
1484            )?;
1485
1486        if let Some(info) = self
1487            .tables
1488            .dynamic_field_index
1489            .get(&(object, dynamic_field_id))?
1490        {
1491            // info.object_id != dynamic_field_id ==> is_wrapper
1492            debug_assert!(
1493                info.object_id == dynamic_field_id
1494                    || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1495            );
1496            return Ok(Some(info.object_id));
1497        }
1498
1499        let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1500        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1501        let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1502            object,
1503            &dynamic_object_field_type,
1504            name_bcs_bytes,
1505        )
1506        .map_err(|e| {
1507            SuiErrorKind::Unknown(format!(
1508                "Unable to generate dynamic field id. Got error: {e:?}"
1509            ))
1510        })?;
1511        if let Some(info) = self
1512            .tables
1513            .dynamic_field_index
1514            .get(&(object, dynamic_object_field_id))?
1515        {
1516            return Ok(Some(info.object_id));
1517        }
1518
1519        Ok(None)
1520    }
1521
1522    #[instrument(skip(self))]
1523    pub fn get_owner_objects(
1524        &self,
1525        owner: SuiAddress,
1526        cursor: Option<ObjectID>,
1527        limit: usize,
1528        filter: Option<SuiObjectDataFilter>,
1529    ) -> SuiResult<Vec<ObjectInfo>> {
1530        let cursor = match cursor {
1531            Some(cursor) => cursor,
1532            None => ObjectID::ZERO,
1533        };
1534        Ok(self
1535            .get_owner_objects_iterator(owner, cursor, filter)?
1536            .take(limit)
1537            .collect())
1538    }
1539
1540    pub fn get_owned_coins_iterator(
1541        coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1542        owner: SuiAddress,
1543        coin_type_tag: Option<String>,
1544    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1545        let all_coins = coin_type_tag.is_none();
1546        let starting_coin_type =
1547            coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1548        let start_key =
1549            CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1550        Ok(coin_index
1551            .safe_iter_with_bounds(Some(start_key), None)
1552            .map(|result| result.expect("iterator db error"))
1553            .take_while(move |(key, _)| {
1554                if key.owner != owner {
1555                    return false;
1556                }
1557                if !all_coins && starting_coin_type != key.coin_type {
1558                    return false;
1559                }
1560                true
1561            }))
1562    }
1563
1564    pub fn get_owned_coins_iterator_with_cursor(
1565        &self,
1566        owner: SuiAddress,
1567        cursor: (String, u64, ObjectID),
1568        limit: usize,
1569        one_coin_type_only: bool,
1570    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1571        let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1572        let start_key = CoinIndexKey2::new_from_cursor(
1573            owner,
1574            starting_coin_type.clone(),
1575            inverted_balance,
1576            starting_object_id,
1577        );
1578        Ok(self
1579            .tables
1580            .coin_index_2
1581            .safe_iter_with_bounds(Some(start_key), None)
1582            .map(|result| result.expect("iterator db error"))
1583            .filter(move |(key, _)| key.object_id != starting_object_id)
1584            .enumerate()
1585            .take_while(move |(index, (key, _))| {
1586                if *index >= limit {
1587                    return false;
1588                }
1589                if key.owner != owner {
1590                    return false;
1591                }
1592                if one_coin_type_only && starting_coin_type != key.coin_type {
1593                    return false;
1594                }
1595                true
1596            })
1597            .map(|(_index, (key, info))| (key, info)))
1598    }
1599
1600    /// starting_object_id can be used to implement pagination, where a client remembers the last
1601    /// object id of each page, and use it to query the next page.
1602    pub fn get_owner_objects_iterator(
1603        &self,
1604        owner: SuiAddress,
1605        starting_object_id: ObjectID,
1606        filter: Option<SuiObjectDataFilter>,
1607    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1608        Ok(self
1609            .tables
1610            .owner_index
1611            // The object id 0 is the smallest possible
1612            .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1613            .map(|result| result.expect("iterator db error"))
1614            .skip(usize::from(starting_object_id != ObjectID::ZERO))
1615            .take_while(move |((address_owner, _), _)| address_owner == &owner)
1616            .filter(move |(_, o)| {
1617                if let Some(filter) = filter.as_ref() {
1618                    filter.matches(o)
1619                } else {
1620                    true
1621                }
1622            })
1623            .map(|(_, object_info)| object_info))
1624    }
1625
1626    pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
1627        let mut batch = self.tables.owner_index.batch();
1628        batch.insert_batch(
1629            &self.tables.owner_index,
1630            object_index_changes.new_owners.into_iter(),
1631        )?;
1632        batch.insert_batch(
1633            &self.tables.dynamic_field_index,
1634            object_index_changes.new_dynamic_fields.into_iter(),
1635        )?;
1636        batch.write()?;
1637        Ok(())
1638    }
1639
1640    pub fn is_empty(&self) -> bool {
1641        self.tables.owner_index.is_empty()
1642    }
1643
1644    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1645        // We are checkpointing the whole db
1646        self.tables
1647            .transactions_from_addr
1648            .checkpoint_db(path)
1649            .map_err(Into::into)
1650    }
1651
1652    /// This method first gets the balance from `per_coin_type_balance` cache. On a cache miss, it
1653    /// gets the balance for passed in `coin_type` from the `all_balance` cache. Only on the second
1654    /// cache miss, we go to the database (expensive) and update the cache. Notice that db read is
1655    /// done with `spawn_blocking` as that is expected to block
1656    #[instrument(skip(self))]
1657    pub fn get_coin_object_balance(
1658        &self,
1659        owner: SuiAddress,
1660        coin_type: TypeTag,
1661    ) -> SuiResult<TotalBalance> {
1662        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1663        let cloned_coin_type = coin_type.clone();
1664        let metrics_cloned = self.metrics.clone();
1665        let coin_index_cloned = self.tables.coin_index_2.clone();
1666        if force_disable_cache {
1667            return Self::get_balance_from_db(
1668                metrics_cloned,
1669                coin_index_cloned,
1670                owner,
1671                cloned_coin_type,
1672            )
1673            .map_err(|e| {
1674                SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1675                    .into()
1676            });
1677        }
1678
1679        self.metrics.balance_lookup_from_total.inc();
1680
1681        let balance = self
1682            .caches
1683            .per_coin_type_balance
1684            .get(&(owner, coin_type.clone()));
1685        if let Some(balance) = balance {
1686            return balance;
1687        }
1688        // cache miss, lookup in all balance cache
1689        let all_balance = self.caches.all_balances.get(&owner.clone());
1690        if let Some(Ok(all_balance)) = all_balance
1691            && let Some(balance) = all_balance.get(&coin_type)
1692        {
1693            return Ok(*balance);
1694        }
1695        let cloned_coin_type = coin_type.clone();
1696        let metrics_cloned = self.metrics.clone();
1697        let coin_index_cloned = self.tables.coin_index_2.clone();
1698        self.caches
1699            .per_coin_type_balance
1700            .get_with((owner, coin_type), move || {
1701                Self::get_balance_from_db(
1702                    metrics_cloned,
1703                    coin_index_cloned,
1704                    owner,
1705                    cloned_coin_type,
1706                )
1707                .map_err(|e| {
1708                    SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1709                        .into()
1710                })
1711            })
1712    }
1713
1714    /// This method gets the balance for all coin types from the `all_balance` cache. On a cache miss,
1715    /// we go to the database (expensive) and update the cache. This cache is dual purpose in the
1716    /// sense that it not only serves `get_AllBalance()` calls but is also used for serving
1717    /// `get_Balance()` queries. Notice that db read is performed with `spawn_blocking` as that is
1718    /// expected to block
1719    #[instrument(skip(self))]
1720    pub fn get_all_coin_object_balances(
1721        &self,
1722        owner: SuiAddress,
1723    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1724        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1725        let metrics_cloned = self.metrics.clone();
1726        let coin_index_cloned = self.tables.coin_index_2.clone();
1727        if force_disable_cache {
1728            return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1729                .map_err(|e| {
1730                    SuiErrorKind::ExecutionError(format!(
1731                        "Failed to read all balance from DB: {:?}",
1732                        e
1733                    ))
1734                    .into()
1735                });
1736        }
1737
1738        self.metrics.all_balance_lookup_from_total.inc();
1739        let metrics_cloned = self.metrics.clone();
1740        let coin_index_cloned = self.tables.coin_index_2.clone();
1741        self.caches.all_balances.get_with(owner, move || {
1742            Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
1743                SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
1744                    .into()
1745            })
1746        })
1747    }
1748
1749    /// Read balance for a `SuiAddress` and `CoinType` from the backend database
1750    #[instrument(skip_all)]
1751    pub fn get_balance_from_db(
1752        metrics: Arc<IndexStoreMetrics>,
1753        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1754        owner: SuiAddress,
1755        coin_type: TypeTag,
1756    ) -> SuiResult<TotalBalance> {
1757        metrics.balance_lookup_from_db.inc();
1758        let coin_type_str = coin_type.to_string();
1759        let coins =
1760            Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
1761
1762        let mut balance = 0i128;
1763        let mut num_coins = 0;
1764        for (_key, coin_info) in coins {
1765            balance += coin_info.balance as i128;
1766            num_coins += 1;
1767        }
1768        Ok(TotalBalance { balance, num_coins })
1769    }
1770
1771    /// Read all balances for a `SuiAddress` from the backend database
1772    #[instrument(skip_all)]
1773    pub fn get_all_balances_from_db(
1774        metrics: Arc<IndexStoreMetrics>,
1775        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1776        owner: SuiAddress,
1777    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1778        metrics.all_balance_lookup_from_db.inc();
1779        let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1780        let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1781            .chunk_by(|(key, _coin)| key.coin_type.clone());
1782        for (coin_type, coins) in &coins {
1783            let mut total_balance = 0i128;
1784            let mut coin_object_count = 0;
1785            for (_, coin_info) in coins {
1786                total_balance += coin_info.balance as i128;
1787                coin_object_count += 1;
1788            }
1789            let coin_type =
1790                TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
1791                    SuiErrorKind::ExecutionError(format!(
1792                        "Failed to parse event sender address: {:?}",
1793                        e
1794                    ))
1795                })?));
1796            balances.insert(
1797                coin_type,
1798                TotalBalance {
1799                    num_coins: coin_object_count,
1800                    balance: total_balance,
1801                },
1802            );
1803        }
1804        Ok(Arc::new(balances))
1805    }
1806
1807    fn invalidate_per_coin_type_cache(
1808        &self,
1809        keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
1810    ) -> SuiResult {
1811        self.caches.per_coin_type_balance.batch_invalidate(keys);
1812        Ok(())
1813    }
1814
1815    fn invalidate_all_balance_cache(
1816        &self,
1817        addresses: impl IntoIterator<Item = SuiAddress>,
1818    ) -> SuiResult {
1819        self.caches.all_balances.batch_invalidate(addresses);
1820        Ok(())
1821    }
1822
1823    fn update_per_coin_type_cache(
1824        &self,
1825        keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
1826    ) -> SuiResult {
1827        self.caches
1828            .per_coin_type_balance
1829            .batch_merge(keys, Self::merge_balance);
1830        Ok(())
1831    }
1832
1833    fn merge_balance(
1834        old_balance: &SuiResult<TotalBalance>,
1835        balance_delta: &SuiResult<TotalBalance>,
1836    ) -> SuiResult<TotalBalance> {
1837        if let Ok(old_balance) = old_balance {
1838            if let Ok(balance_delta) = balance_delta {
1839                Ok(TotalBalance {
1840                    balance: old_balance.balance + balance_delta.balance,
1841                    num_coins: old_balance.num_coins + balance_delta.num_coins,
1842                })
1843            } else {
1844                balance_delta.clone()
1845            }
1846        } else {
1847            old_balance.clone()
1848        }
1849    }
1850
1851    fn update_all_balance_cache(
1852        &self,
1853        keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1854    ) -> SuiResult {
1855        self.caches
1856            .all_balances
1857            .batch_merge(keys, Self::merge_all_balance);
1858        Ok(())
1859    }
1860
1861    fn merge_all_balance(
1862        old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1863        balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1864    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1865        if let Ok(old_balance) = old_balance {
1866            if let Ok(balance_delta) = balance_delta {
1867                let mut new_balance = HashMap::new();
1868                for (key, value) in old_balance.iter() {
1869                    new_balance.insert(key.clone(), *value);
1870                }
1871                for (key, delta) in balance_delta.iter() {
1872                    let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1873                        balance: 0,
1874                        num_coins: 0,
1875                    });
1876                    let new_total = TotalBalance {
1877                        balance: old.balance + delta.balance,
1878                        num_coins: old.num_coins + delta.num_coins,
1879                    };
1880                    new_balance.insert(key.clone(), new_total);
1881                }
1882                Ok(Arc::new(new_balance))
1883            } else {
1884                balance_delta.clone()
1885            }
1886        } else {
1887            old_balance.clone()
1888        }
1889    }
1890}
1891
1892#[cfg(test)]
1893mod tests {
1894    use super::IndexStore;
1895    use super::ObjectIndexChanges;
1896    use move_core_types::account_address::AccountAddress;
1897    use prometheus::Registry;
1898    use std::collections::BTreeMap;
1899    use std::env::temp_dir;
1900    use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
1901    use sui_types::digests::TransactionDigest;
1902    use sui_types::effects::TransactionEvents;
1903    use sui_types::gas_coin::GAS;
1904    use sui_types::object;
1905    use sui_types::object::Owner;
1906
1907    #[tokio::test]
1908    async fn test_index_cache() -> anyhow::Result<()> {
1909        // This test is going to invoke `index_tx()`where 10 coins each with balance 100
1910        // are going to be added to an address. The balance is then going to be read from the db
1911        // and the cache. It should be 1000. Then, we are going to delete 3 of those coins from
1912        // the address and invoke `index_tx()` again and read balance. The balance should be 700
1913        // and verified from both db and cache.
1914        // This tests make sure we are invalidating entries in the cache and always reading latest
1915        // balance.
1916        let index_store =
1917            IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
1918        let address: SuiAddress = AccountAddress::random().into();
1919        let mut written_objects = BTreeMap::new();
1920        let mut input_objects = BTreeMap::new();
1921        let mut object_map = BTreeMap::new();
1922
1923        let mut new_objects = vec![];
1924        for _i in 0..10 {
1925            let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1926            new_objects.push((
1927                (address, object.id()),
1928                ObjectInfo {
1929                    object_id: object.id(),
1930                    version: object.version(),
1931                    digest: object.digest(),
1932                    type_: ObjectType::Struct(object.type_().unwrap().clone()),
1933                    owner: Owner::AddressOwner(address),
1934                    previous_transaction: object.previous_transaction,
1935                },
1936            ));
1937            object_map.insert(object.id(), object.clone());
1938            written_objects.insert(object.data.id(), object);
1939        }
1940        let object_index_changes = ObjectIndexChanges {
1941            deleted_owners: vec![],
1942            deleted_dynamic_fields: vec![],
1943            new_owners: new_objects,
1944            new_dynamic_fields: vec![],
1945        };
1946
1947        let tx_coins = (input_objects.clone(), written_objects.clone());
1948        index_store.index_tx(
1949            address,
1950            vec![].into_iter(),
1951            vec![].into_iter(),
1952            vec![].into_iter(),
1953            &TransactionEvents { data: vec![] },
1954            object_index_changes,
1955            &TransactionDigest::random(),
1956            1234,
1957            Some(tx_coins),
1958        )?;
1959
1960        let balance_from_db = IndexStore::get_balance_from_db(
1961            index_store.metrics.clone(),
1962            index_store.tables.coin_index_2.clone(),
1963            address,
1964            GAS::type_tag(),
1965        )?;
1966        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
1967        assert_eq!(balance, balance_from_db);
1968        assert_eq!(balance.balance, 1000);
1969        assert_eq!(balance.num_coins, 10);
1970
1971        let all_balance = index_store.get_all_coin_object_balances(address)?;
1972        let balance = all_balance.get(&GAS::type_tag()).unwrap();
1973        assert_eq!(*balance, balance_from_db);
1974        assert_eq!(balance.balance, 1000);
1975        assert_eq!(balance.num_coins, 10);
1976
1977        written_objects.clear();
1978        let mut deleted_objects = vec![];
1979        for (id, object) in object_map.iter().take(3) {
1980            deleted_objects.push((address, *id));
1981            input_objects.insert(*id, object.to_owned());
1982        }
1983        let object_index_changes = ObjectIndexChanges {
1984            deleted_owners: deleted_objects.clone(),
1985            deleted_dynamic_fields: vec![],
1986            new_owners: vec![],
1987            new_dynamic_fields: vec![],
1988        };
1989        let tx_coins = (input_objects, written_objects);
1990        index_store.index_tx(
1991            address,
1992            vec![].into_iter(),
1993            vec![].into_iter(),
1994            vec![].into_iter(),
1995            &TransactionEvents { data: vec![] },
1996            object_index_changes,
1997            &TransactionDigest::random(),
1998            1234,
1999            Some(tx_coins),
2000        )?;
2001        let balance_from_db = IndexStore::get_balance_from_db(
2002            index_store.metrics.clone(),
2003            index_store.tables.coin_index_2.clone(),
2004            address,
2005            GAS::type_tag(),
2006        )?;
2007        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2008        assert_eq!(balance, balance_from_db);
2009        assert_eq!(balance.balance, 700);
2010        assert_eq!(balance.num_coins, 7);
2011        // Invalidate per coin type balance cache and read from all balance cache to ensure
2012        // the balance matches
2013        index_store
2014            .caches
2015            .per_coin_type_balance
2016            .invalidate(&(address, GAS::type_tag()));
2017        let all_balance = index_store.get_all_coin_object_balances(address)?;
2018        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2019        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2020        let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2021        assert_eq!(balance, balance_from_db);
2022        assert_eq!(balance.balance, 700);
2023        assert_eq!(balance.num_coins, 7);
2024
2025        Ok(())
2026    }
2027
2028    #[tokio::test]
2029    async fn test_get_transaction_by_move_function() {
2030        use sui_types::base_types::ObjectID;
2031        use typed_store::Map;
2032
2033        let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2034        let db = &index_store.tables.transactions_by_move_function;
2035        db.insert(
2036            &(
2037                ObjectID::new([1; 32]),
2038                "mod".to_string(),
2039                "f".to_string(),
2040                0,
2041            ),
2042            &[0; 32].into(),
2043        )
2044        .unwrap();
2045        db.insert(
2046            &(
2047                ObjectID::new([1; 32]),
2048                "mod".to_string(),
2049                "Z".repeat(128),
2050                0,
2051            ),
2052            &[1; 32].into(),
2053        )
2054        .unwrap();
2055        db.insert(
2056            &(
2057                ObjectID::new([1; 32]),
2058                "mod".to_string(),
2059                "f".repeat(128),
2060                0,
2061            ),
2062            &[2; 32].into(),
2063        )
2064        .unwrap();
2065        db.insert(
2066            &(
2067                ObjectID::new([1; 32]),
2068                "mod".to_string(),
2069                "z".repeat(128),
2070                0,
2071            ),
2072            &[3; 32].into(),
2073        )
2074        .unwrap();
2075
2076        let mut v = index_store
2077            .get_transactions_by_move_function(
2078                ObjectID::new([1; 32]),
2079                Some("mod".to_string()),
2080                None,
2081                None,
2082                None,
2083                false,
2084            )
2085            .unwrap();
2086        let v_rev = index_store
2087            .get_transactions_by_move_function(
2088                ObjectID::new([1; 32]),
2089                Some("mod".to_string()),
2090                None,
2091                None,
2092                None,
2093                true,
2094            )
2095            .unwrap();
2096        v.reverse();
2097        assert_eq!(v, v_rev);
2098    }
2099}