sui_core/
jsonrpc_index.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! IndexStore supports creation of various ancillary indexes of state in SuiDataStore.
5//! The main user of this data is the explorer.
6
7use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18    IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19    register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use typed_store::TypedStoreError;
23use typed_store::rocksdb::compaction_filter::Decision;
24
25use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
26use sui_storage::mutex_table::MutexTable;
27use sui_storage::sharded_lru::ShardedLruCache;
28use sui_types::base_types::{
29    ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
30};
31use sui_types::base_types::{ObjectInfo, ObjectRef};
32use sui_types::digests::TransactionEventsDigest;
33use sui_types::dynamic_field::{self, DynamicFieldInfo};
34use sui_types::effects::TransactionEvents;
35use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
36use sui_types::inner_temporary_store::TxCoins;
37use sui_types::object::{Object, Owner};
38use sui_types::parse_sui_struct_tag;
39use sui_types::storage::error::Error as StorageError;
40use tracing::{debug, info, instrument, trace};
41use typed_store::DBMapUtils;
42use typed_store::rocks::{
43    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
44    read_size_from_env,
45};
46use typed_store::traits::Map;
47
48type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
49
50type OwnerIndexKey = (SuiAddress, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
57pub struct CoinIndexKey2 {
58    pub owner: SuiAddress,
59    pub coin_type: String,
60    // the balance of the coin inverted `!coin.balance` in order to force sorting of coins to be
61    // from greatest to least
62    pub inverted_balance: u64,
63    pub object_id: ObjectID,
64}
65
66impl CoinIndexKey2 {
67    pub fn new_from_cursor(
68        owner: SuiAddress,
69        coin_type: String,
70        inverted_balance: u64,
71        object_id: ObjectID,
72    ) -> Self {
73        Self {
74            owner,
75            coin_type,
76            inverted_balance,
77            object_id,
78        }
79    }
80
81    pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
82        Self {
83            owner,
84            coin_type,
85            inverted_balance: !balance,
86            object_id,
87        }
88    }
89}
90
91const CURRENT_DB_VERSION: u64 = 0;
92const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
93
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
95struct MetadataInfo {
96    /// Version of the Database
97    version: u64,
98    /// Version of each of the column families
99    ///
100    /// This is used to version individual column families to determine if a CF needs to be
101    /// (re)initialized on startup.
102    column_families: BTreeMap<String, ColumnFamilyInfo>,
103}
104
105#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106struct ColumnFamilyInfo {
107    version: u64,
108}
109
110pub const MAX_TX_RANGE_SIZE: u64 = 4096;
111
112pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
113const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
114const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
115const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
116
117#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
118pub struct TotalBalance {
119    pub balance: i128,
120    pub num_coins: i64,
121}
122
123#[derive(Debug)]
124pub struct ObjectIndexChanges {
125    pub deleted_owners: Vec<OwnerIndexKey>,
126    pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
127    pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
128    pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
129}
130
131#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
132pub struct CoinInfo {
133    pub version: SequenceNumber,
134    pub digest: ObjectDigest,
135    pub balance: u64,
136    pub previous_transaction: TransactionDigest,
137}
138
139impl CoinInfo {
140    pub fn from_object(object: &Object) -> Option<CoinInfo> {
141        object.as_coin_maybe().map(|coin| CoinInfo {
142            version: object.version(),
143            digest: object.digest(),
144            previous_transaction: object.previous_transaction,
145            balance: coin.value(),
146        })
147    }
148}
149
150pub struct IndexStoreMetrics {
151    balance_lookup_from_db: IntCounter,
152    balance_lookup_from_total: IntCounter,
153    all_balance_lookup_from_db: IntCounter,
154    all_balance_lookup_from_total: IntCounter,
155}
156
157impl IndexStoreMetrics {
158    pub fn new(registry: &Registry) -> IndexStoreMetrics {
159        Self {
160            balance_lookup_from_db: register_int_counter_with_registry!(
161                "balance_lookup_from_db",
162                "Total number of balance requests served from cache",
163                registry,
164            )
165            .unwrap(),
166            balance_lookup_from_total: register_int_counter_with_registry!(
167                "balance_lookup_from_total",
168                "Total number of balance requests served ",
169                registry,
170            )
171            .unwrap(),
172            all_balance_lookup_from_db: register_int_counter_with_registry!(
173                "all_balance_lookup_from_db",
174                "Total number of all balance requests served from cache",
175                registry,
176            )
177            .unwrap(),
178            all_balance_lookup_from_total: register_int_counter_with_registry!(
179                "all_balance_lookup_from_total",
180                "Total number of all balance requests served",
181                registry,
182            )
183            .unwrap(),
184        }
185    }
186}
187
188pub struct IndexStoreCaches {
189    per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
190    all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
191    pub locks: MutexTable<SuiAddress>,
192}
193
194#[derive(Default)]
195pub struct IndexStoreCacheUpdates {
196    _locks: Vec<OwnedMutexGuard<()>>,
197    per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
198    all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
199}
200
201#[derive(DBMapUtils)]
202pub struct IndexStoreTables {
203    /// A singleton that store metadata information on the DB.
204    ///
205    /// A few uses for this singleton:
206    /// - determining if the DB has been initialized (as some tables could still be empty post
207    ///   initialization)
208    /// - version of each column family and their respective initialization status
209    meta: DBMap<(), MetadataInfo>,
210
211    /// Index from sui address to transactions initiated by that address.
212    transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
213
214    /// Index from sui address to transactions that were sent to that address.
215    transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
216
217    /// Index from object id to transactions that used that object id as input.
218    #[deprecated]
219    transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
220
221    /// Index from object id to transactions that modified/created that object id.
222    #[deprecated]
223    transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
224
225    /// Index from package id, module and function identifier to transactions that used that moce function call as input.
226    transactions_by_move_function:
227        DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
228
229    /// Ordering of all indexed transactions.
230    transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
231
232    /// Index from transaction digest to sequence number.
233    transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
234
235    /// This is an index of object references to currently existing objects, indexed by the
236    /// composite key of the SuiAddress of their owner and the object ID of the object.
237    /// This composite index allows an efficient iterator to list all objected currently owned
238    /// by a specific user, and their object reference.
239    owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
240
241    coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
242
243    /// This is an index of object references to currently existing dynamic field object, indexed by the
244    /// composite key of the object ID of their parent and the object ID of the dynamic field object.
245    /// This composite index allows an efficient iterator to list all objects currently owned
246    /// by a specific object, and their object reference.
247    dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
248
249    event_order: DBMap<EventId, EventIndex>,
250    event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
251    event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
252    event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
253    event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
254    event_by_time: DBMap<(u64, EventId), EventIndex>,
255
256    pruner_watermark: DBMap<(), TxSequenceNumber>,
257}
258
259impl IndexStoreTables {
260    pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
261        &self.owner_index
262    }
263
264    pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
265        &self.coin_index_2
266    }
267
268    #[allow(deprecated)]
269    fn init(&mut self) -> Result<(), StorageError> {
270        let metadata = {
271            match self.meta.get(&()) {
272                Ok(Some(metadata)) => metadata,
273                Ok(None) | Err(_) => MetadataInfo {
274                    version: CURRENT_DB_VERSION,
275                    column_families: BTreeMap::new(),
276                },
277            }
278        };
279
280        // Commit to the DB that the indexes have been initialized
281        self.meta.insert(&(), &metadata)?;
282
283        Ok(())
284    }
285}
286
287pub struct IndexStore {
288    next_sequence_number: AtomicU64,
289    tables: IndexStoreTables,
290    pub caches: IndexStoreCaches,
291    metrics: Arc<IndexStoreMetrics>,
292    max_type_length: u64,
293    remove_deprecated_tables: bool,
294    pruner_watermark: Arc<AtomicU64>,
295}
296
297struct JsonRpcCompactionMetrics {
298    key_removed: IntCounterVec,
299    key_kept: IntCounterVec,
300    key_error: IntCounterVec,
301}
302
303impl JsonRpcCompactionMetrics {
304    pub fn new(registry: &Registry) -> Arc<Self> {
305        Arc::new(Self {
306            key_removed: register_int_counter_vec_with_registry!(
307                "json_rpc_compaction_filter_key_removed",
308                "Compaction key removed",
309                &["cf"],
310                registry
311            )
312            .unwrap(),
313            key_kept: register_int_counter_vec_with_registry!(
314                "json_rpc_compaction_filter_key_kept",
315                "Compaction key kept",
316                &["cf"],
317                registry
318            )
319            .unwrap(),
320            key_error: register_int_counter_vec_with_registry!(
321                "json_rpc_compaction_filter_key_error",
322                "Compaction error",
323                &["cf"],
324                registry
325            )
326            .unwrap(),
327        })
328    }
329}
330
331fn compaction_filter_config<T: DeserializeOwned>(
332    name: &str,
333    metrics: Arc<JsonRpcCompactionMetrics>,
334    mut db_options: DBOptions,
335    pruner_watermark: Arc<AtomicU64>,
336    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
337    by_key: bool,
338) -> DBOptions {
339    let cf = name.to_string();
340    db_options
341        .options
342        .set_compaction_filter(name, move |_, key, value| {
343            let bytes = if by_key { key } else { value };
344            let deserializer = bincode::DefaultOptions::new()
345                .with_big_endian()
346                .with_fixint_encoding();
347            match deserializer.deserialize(bytes) {
348                Ok(key_data) => {
349                    let sequence_number = extractor(key_data);
350                    if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
351                        metrics.key_removed.with_label_values(&[&cf]).inc();
352                        Decision::Remove
353                    } else {
354                        metrics.key_kept.with_label_values(&[&cf]).inc();
355                        Decision::Keep
356                    }
357                }
358                Err(_) => {
359                    metrics.key_error.with_label_values(&[&cf]).inc();
360                    Decision::Keep
361                }
362            }
363        });
364    db_options
365}
366
367fn compaction_filter_config_by_key<T: DeserializeOwned>(
368    name: &str,
369    metrics: Arc<JsonRpcCompactionMetrics>,
370    db_options: DBOptions,
371    pruner_watermark: Arc<AtomicU64>,
372    extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
373) -> DBOptions {
374    compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
375}
376
377fn coin_index_table_default_config() -> DBOptions {
378    default_db_options()
379        .optimize_for_write_throughput()
380        .optimize_for_read(
381            read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
382        )
383        .disable_write_throttling()
384}
385
386impl IndexStore {
387    pub fn new_without_init(
388        path: PathBuf,
389        registry: &Registry,
390        max_type_length: Option<u64>,
391        remove_deprecated_tables: bool,
392    ) -> Self {
393        let db_options = default_db_options().disable_write_throttling();
394        let pruner_watermark = Arc::new(AtomicU64::new(0));
395        let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
396        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
397            (
398                "transactions_from_addr".to_string(),
399                compaction_filter_config_by_key(
400                    "transactions_from_addr",
401                    compaction_metrics.clone(),
402                    db_options.clone(),
403                    pruner_watermark.clone(),
404                    |(_, id): (SuiAddress, TxSequenceNumber)| id,
405                ),
406            ),
407            (
408                "transactions_to_addr".to_string(),
409                compaction_filter_config_by_key(
410                    "transactions_to_addr",
411                    compaction_metrics.clone(),
412                    db_options.clone(),
413                    pruner_watermark.clone(),
414                    |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
415                ),
416            ),
417            (
418                "transactions_by_move_function".to_string(),
419                compaction_filter_config_by_key(
420                    "transactions_by_move_function",
421                    compaction_metrics.clone(),
422                    db_options.clone(),
423                    pruner_watermark.clone(),
424                    |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
425                ),
426            ),
427            (
428                "transaction_order".to_string(),
429                compaction_filter_config_by_key(
430                    "transaction_order",
431                    compaction_metrics.clone(),
432                    db_options.clone(),
433                    pruner_watermark.clone(),
434                    |sequence_number: TxSequenceNumber| sequence_number,
435                ),
436            ),
437            (
438                "transactions_seq".to_string(),
439                compaction_filter_config(
440                    "transactions_seq",
441                    compaction_metrics.clone(),
442                    db_options.clone(),
443                    pruner_watermark.clone(),
444                    |sequence_number: TxSequenceNumber| sequence_number,
445                    false,
446                ),
447            ),
448            (
449                "coin_index_2".to_string(),
450                coin_index_table_default_config(),
451            ),
452            (
453                "event_order".to_string(),
454                compaction_filter_config_by_key(
455                    "event_order",
456                    compaction_metrics.clone(),
457                    db_options.clone(),
458                    pruner_watermark.clone(),
459                    |event_id: EventId| event_id.0,
460                ),
461            ),
462            (
463                "event_by_move_module".to_string(),
464                compaction_filter_config_by_key(
465                    "event_by_move_module",
466                    compaction_metrics.clone(),
467                    db_options.clone(),
468                    pruner_watermark.clone(),
469                    |(_, event_id): (ModuleId, EventId)| event_id.0,
470                ),
471            ),
472            (
473                "event_by_event_module".to_string(),
474                compaction_filter_config_by_key(
475                    "event_by_event_module",
476                    compaction_metrics.clone(),
477                    db_options.clone(),
478                    pruner_watermark.clone(),
479                    |(_, event_id): (ModuleId, EventId)| event_id.0,
480                ),
481            ),
482            (
483                "event_by_sender".to_string(),
484                compaction_filter_config_by_key(
485                    "event_by_sender",
486                    compaction_metrics.clone(),
487                    db_options.clone(),
488                    pruner_watermark.clone(),
489                    |(_, event_id): (SuiAddress, EventId)| event_id.0,
490                ),
491            ),
492            (
493                "event_by_time".to_string(),
494                compaction_filter_config_by_key(
495                    "event_by_time",
496                    compaction_metrics.clone(),
497                    db_options.clone(),
498                    pruner_watermark.clone(),
499                    |(_, event_id): (u64, EventId)| event_id.0,
500                ),
501            ),
502        ]));
503        let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
504            path,
505            MetricConf::new("index"),
506            Some(db_options.options),
507            Some(table_options),
508            remove_deprecated_tables,
509        );
510
511        let metrics = IndexStoreMetrics::new(registry);
512        let caches = IndexStoreCaches {
513            per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
514            all_balances: ShardedLruCache::new(1_000_000, 1000),
515            locks: MutexTable::new(128),
516        };
517        let next_sequence_number = tables
518            .transaction_order
519            .reversed_safe_iter_with_bounds(None, None)
520            .expect("failed to initialize indexes")
521            .next()
522            .transpose()
523            .expect("failed to initialize indexes")
524            .map(|(seq, _)| seq + 1)
525            .unwrap_or(0)
526            .into();
527        let pruner_watermark_value = tables
528            .pruner_watermark
529            .get(&())
530            .expect("failed to initialize index tables")
531            .unwrap_or(0);
532        pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
533
534        Self {
535            tables,
536            next_sequence_number,
537            caches,
538            metrics: Arc::new(metrics),
539            max_type_length: max_type_length.unwrap_or(128),
540            remove_deprecated_tables,
541            pruner_watermark,
542        }
543    }
544
545    pub fn new(
546        path: PathBuf,
547        registry: &Registry,
548        max_type_length: Option<u64>,
549        remove_deprecated_tables: bool,
550    ) -> Self {
551        let mut store =
552            Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
553        store.tables.init().unwrap();
554        store
555    }
556
557    pub fn tables(&self) -> &IndexStoreTables {
558        &self.tables
559    }
560
561    #[instrument(skip_all)]
562    pub fn index_coin(
563        &self,
564        digest: &TransactionDigest,
565        batch: &mut DBBatch,
566        object_index_changes: &ObjectIndexChanges,
567        tx_coins: Option<TxCoins>,
568    ) -> SuiResult<IndexStoreCacheUpdates> {
569        // In production if this code path is hit, we should expect `tx_coins` to not be None.
570        // However, in many tests today we do not distinguish validator and/or fullnode, so
571        // we gracefully exist here.
572        if tx_coins.is_none() {
573            return Ok(IndexStoreCacheUpdates::default());
574        }
575        // Acquire locks on changed coin owners
576        let mut addresses: HashSet<SuiAddress> = HashSet::new();
577        addresses.extend(
578            object_index_changes
579                .deleted_owners
580                .iter()
581                .map(|(owner, _)| *owner),
582        );
583        addresses.extend(
584            object_index_changes
585                .new_owners
586                .iter()
587                .map(|((owner, _), _)| *owner),
588        );
589        let _locks = self.caches.locks.acquire_locks(addresses.into_iter());
590        let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
591            HashMap::new();
592        // Index coin info
593        let (input_coins, written_coins) = tx_coins.unwrap();
594
595        // 1. Remove old coins from the DB by looking at the set of input coin objects
596        let coin_delete_keys = input_coins
597            .values()
598            .filter_map(|object| {
599                // only process address owned coins
600                let Owner::AddressOwner(owner) = object.owner() else {
601                    return None;
602                };
603
604                // only process coin types
605                let (coin_type, coin) = object
606                    .coin_type_maybe()
607                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
608
609                let key = CoinIndexKey2::new(
610                    *owner,
611                    coin_type.to_string(),
612                    coin.balance.value(),
613                    object.id(),
614                );
615
616                let map = balance_changes.entry(*owner).or_default();
617                let entry = map.entry(coin_type).or_insert(TotalBalance {
618                    num_coins: 0,
619                    balance: 0,
620                });
621                entry.num_coins -= 1;
622                entry.balance -= coin.balance.value() as i128;
623
624                Some(key)
625            })
626            .collect::<Vec<_>>();
627        trace!(
628            tx_digset=?digest,
629            "coin_delete_keys: {:?}",
630            coin_delete_keys,
631        );
632        batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
633
634        // 2. Insert new coins, or new versions of coins, by looking at `written_coins`.
635        let coin_add_keys = written_coins
636            .values()
637            .filter_map(|object| {
638                // only process address owned coins
639                let Owner::AddressOwner(owner) = object.owner() else {
640                    return None;
641                };
642
643                // only process coin types
644                let (coin_type, coin) = object
645                    .coin_type_maybe()
646                    .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
647
648                let key = CoinIndexKey2::new(
649                    *owner,
650                    coin_type.to_string(),
651                    coin.balance.value(),
652                    object.id(),
653                );
654                let value = CoinInfo {
655                    version: object.version(),
656                    digest: object.digest(),
657                    balance: coin.balance.value(),
658                    previous_transaction: object.previous_transaction,
659                };
660                let map = balance_changes.entry(*owner).or_default();
661                let entry = map.entry(coin_type).or_insert(TotalBalance {
662                    num_coins: 0,
663                    balance: 0,
664                });
665                entry.num_coins += 1;
666                entry.balance += coin.balance.value() as i128;
667
668                Some((key, value))
669            })
670            .collect::<Vec<_>>();
671        trace!(
672            tx_digset=?digest,
673            "coin_add_keys: {:?}",
674            coin_add_keys,
675        );
676
677        batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
678
679        let per_coin_type_balance_changes: Vec<_> = balance_changes
680            .iter()
681            .flat_map(|(address, balance_map)| {
682                balance_map.iter().map(|(type_tag, balance)| {
683                    (
684                        (*address, type_tag.clone()),
685                        Ok::<TotalBalance, SuiError>(*balance),
686                    )
687                })
688            })
689            .collect();
690        let all_balance_changes: Vec<_> = balance_changes
691            .into_iter()
692            .map(|(address, balance_map)| {
693                (
694                    address,
695                    Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
696                )
697            })
698            .collect();
699        let cache_updates = IndexStoreCacheUpdates {
700            _locks,
701            per_coin_type_balance_changes,
702            all_balance_changes,
703        };
704        Ok(cache_updates)
705    }
706
707    #[instrument(skip_all)]
708    pub fn index_tx(
709        &self,
710        sender: SuiAddress,
711        active_inputs: impl Iterator<Item = ObjectID>,
712        mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
713        move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
714        events: &TransactionEvents,
715        object_index_changes: ObjectIndexChanges,
716        digest: &TransactionDigest,
717        timestamp_ms: u64,
718        tx_coins: Option<TxCoins>,
719    ) -> SuiResult<u64> {
720        let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
721        let mut batch = self.tables.transactions_from_addr.batch();
722
723        batch.insert_batch(
724            &self.tables.transaction_order,
725            std::iter::once((sequence, *digest)),
726        )?;
727
728        batch.insert_batch(
729            &self.tables.transactions_seq,
730            std::iter::once((*digest, sequence)),
731        )?;
732
733        batch.insert_batch(
734            &self.tables.transactions_from_addr,
735            std::iter::once(((sender, sequence), *digest)),
736        )?;
737
738        #[allow(deprecated)]
739        if !self.remove_deprecated_tables {
740            batch.insert_batch(
741                &self.tables.transactions_by_input_object_id,
742                active_inputs.map(|id| ((id, sequence), *digest)),
743            )?;
744
745            batch.insert_batch(
746                &self.tables.transactions_by_mutated_object_id,
747                mutated_objects
748                    .clone()
749                    .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
750            )?;
751        }
752
753        batch.insert_batch(
754            &self.tables.transactions_by_move_function,
755            move_functions
756                .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
757        )?;
758
759        batch.insert_batch(
760            &self.tables.transactions_to_addr,
761            mutated_objects.filter_map(|(_, owner)| {
762                owner
763                    .get_address_owner_address()
764                    .ok()
765                    .map(|addr| ((addr, sequence), digest))
766            }),
767        )?;
768
769        // Coin Index
770        let cache_updates = self.index_coin(digest, &mut batch, &object_index_changes, tx_coins)?;
771
772        // Owner index
773        batch.delete_batch(
774            &self.tables.owner_index,
775            object_index_changes.deleted_owners.into_iter(),
776        )?;
777        batch.delete_batch(
778            &self.tables.dynamic_field_index,
779            object_index_changes.deleted_dynamic_fields.into_iter(),
780        )?;
781
782        batch.insert_batch(
783            &self.tables.owner_index,
784            object_index_changes.new_owners.into_iter(),
785        )?;
786
787        batch.insert_batch(
788            &self.tables.dynamic_field_index,
789            object_index_changes.new_dynamic_fields.into_iter(),
790        )?;
791
792        // events
793        let event_digest = events.digest();
794        batch.insert_batch(
795            &self.tables.event_order,
796            events
797                .data
798                .iter()
799                .enumerate()
800                .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
801        )?;
802        batch.insert_batch(
803            &self.tables.event_by_move_module,
804            events
805                .data
806                .iter()
807                .enumerate()
808                .map(|(i, e)| {
809                    (
810                        i,
811                        ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
812                    )
813                })
814                .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
815        )?;
816        batch.insert_batch(
817            &self.tables.event_by_sender,
818            events.data.iter().enumerate().map(|(i, e)| {
819                (
820                    (e.sender, (sequence, i)),
821                    (event_digest, *digest, timestamp_ms),
822                )
823            }),
824        )?;
825        batch.insert_batch(
826            &self.tables.event_by_move_event,
827            events.data.iter().enumerate().map(|(i, e)| {
828                (
829                    (e.type_.clone(), (sequence, i)),
830                    (event_digest, *digest, timestamp_ms),
831                )
832            }),
833        )?;
834
835        batch.insert_batch(
836            &self.tables.event_by_time,
837            events.data.iter().enumerate().map(|(i, _)| {
838                (
839                    (timestamp_ms, (sequence, i)),
840                    (event_digest, *digest, timestamp_ms),
841                )
842            }),
843        )?;
844
845        batch.insert_batch(
846            &self.tables.event_by_event_module,
847            events.data.iter().enumerate().map(|(i, e)| {
848                (
849                    (
850                        ModuleId::new(e.type_.address, e.type_.module.clone()),
851                        (sequence, i),
852                    ),
853                    (event_digest, *digest, timestamp_ms),
854                )
855            }),
856        )?;
857
858        let invalidate_caches =
859            read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
860
861        if invalidate_caches {
862            // Invalidate cache before writing to db so we always serve latest values
863            self.invalidate_per_coin_type_cache(
864                cache_updates
865                    .per_coin_type_balance_changes
866                    .iter()
867                    .map(|x| x.0.clone()),
868            )?;
869            self.invalidate_all_balance_cache(
870                cache_updates.all_balance_changes.iter().map(|x| x.0),
871            )?;
872        }
873
874        batch.write()?;
875
876        if !invalidate_caches {
877            // We cannot update the cache before updating the db or else on failing to write to db
878            // we will update the cache twice). However, this only means cache is eventually consistent with
879            // the db (within a very short delay)
880            self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)?;
881            self.update_all_balance_cache(cache_updates.all_balance_changes)?;
882        }
883        Ok(sequence)
884    }
885
886    pub fn next_sequence_number(&self) -> TxSequenceNumber {
887        self.next_sequence_number.load(Ordering::SeqCst) + 1
888    }
889
890    #[instrument(skip(self))]
891    pub fn get_transactions(
892        &self,
893        filter: Option<TransactionFilter>,
894        cursor: Option<TransactionDigest>,
895        limit: Option<usize>,
896        reverse: bool,
897    ) -> SuiResult<Vec<TransactionDigest>> {
898        // Lookup TransactionDigest sequence number,
899        let cursor = if let Some(cursor) = cursor {
900            Some(
901                self.get_transaction_seq(&cursor)?
902                    .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
903            )
904        } else {
905            None
906        };
907        match filter {
908            Some(TransactionFilter::MoveFunction {
909                package,
910                module,
911                function,
912            }) => Ok(self.get_transactions_by_move_function(
913                package, module, function, cursor, limit, reverse,
914            )?),
915            Some(TransactionFilter::InputObject(object_id)) => {
916                Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
917            }
918            Some(TransactionFilter::ChangedObject(object_id)) => {
919                Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
920            }
921            Some(TransactionFilter::FromAddress(address)) => {
922                Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
923            }
924            Some(TransactionFilter::ToAddress(address)) => {
925                Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
926            }
927            // NOTE: filter via checkpoint sequence number is implemented in
928            // `get_transactions` of authority.rs.
929            Some(_) => Err(SuiErrorKind::UserInputError {
930                error: UserInputError::Unsupported(format!("{:?}", filter)),
931            }
932            .into()),
933            None => {
934                if reverse {
935                    let iter = self
936                        .tables
937                        .transaction_order
938                        .reversed_safe_iter_with_bounds(
939                            None,
940                            Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
941                        )?
942                        .skip(usize::from(cursor.is_some()))
943                        .map(|result| result.map(|(_, digest)| digest));
944                    if let Some(limit) = limit {
945                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
946                    } else {
947                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
948                    }
949                } else {
950                    let iter = self
951                        .tables
952                        .transaction_order
953                        .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
954                        .skip(usize::from(cursor.is_some()))
955                        .map(|result| result.map(|(_, digest)| digest));
956                    if let Some(limit) = limit {
957                        Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
958                    } else {
959                        Ok(iter.collect::<Result<Vec<_>, _>>()?)
960                    }
961                }
962            }
963        }
964    }
965
966    #[instrument(skip_all)]
967    fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
968        index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
969        key: KeyT,
970        cursor: Option<TxSequenceNumber>,
971        limit: Option<usize>,
972        reverse: bool,
973    ) -> SuiResult<Vec<TransactionDigest>> {
974        Ok(if reverse {
975            let iter = index
976                .reversed_safe_iter_with_bounds(
977                    None,
978                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
979                )?
980                // skip one more if exclusive cursor is Some
981                .skip(usize::from(cursor.is_some()))
982                .take_while(|result| {
983                    result
984                        .as_ref()
985                        .map(|((id, _), _)| *id == key)
986                        .unwrap_or(false)
987                })
988                .map(|result| result.map(|(_, digest)| digest));
989            if let Some(limit) = limit {
990                iter.take(limit).collect::<Result<Vec<_>, _>>()?
991            } else {
992                iter.collect::<Result<Vec<_>, _>>()?
993            }
994        } else {
995            let iter = index
996                .safe_iter_with_bounds(
997                    Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
998                    None,
999                )
1000                // skip one more if exclusive cursor is Some
1001                .skip(usize::from(cursor.is_some()))
1002                .map(|result| result.expect("iterator db error"))
1003                .take_while(|((id, _), _)| *id == key)
1004                .map(|(_, digest)| digest);
1005            if let Some(limit) = limit {
1006                iter.take(limit).collect()
1007            } else {
1008                iter.collect()
1009            }
1010        })
1011    }
1012
1013    #[instrument(skip(self))]
1014    pub fn get_transactions_by_input_object(
1015        &self,
1016        input_object: ObjectID,
1017        cursor: Option<TxSequenceNumber>,
1018        limit: Option<usize>,
1019        reverse: bool,
1020    ) -> SuiResult<Vec<TransactionDigest>> {
1021        if self.remove_deprecated_tables {
1022            return Ok(vec![]);
1023        }
1024        #[allow(deprecated)]
1025        Self::get_transactions_from_index(
1026            &self.tables.transactions_by_input_object_id,
1027            input_object,
1028            cursor,
1029            limit,
1030            reverse,
1031        )
1032    }
1033
1034    #[instrument(skip(self))]
1035    pub fn get_transactions_by_mutated_object(
1036        &self,
1037        mutated_object: ObjectID,
1038        cursor: Option<TxSequenceNumber>,
1039        limit: Option<usize>,
1040        reverse: bool,
1041    ) -> SuiResult<Vec<TransactionDigest>> {
1042        if self.remove_deprecated_tables {
1043            return Ok(vec![]);
1044        }
1045        #[allow(deprecated)]
1046        Self::get_transactions_from_index(
1047            &self.tables.transactions_by_mutated_object_id,
1048            mutated_object,
1049            cursor,
1050            limit,
1051            reverse,
1052        )
1053    }
1054
1055    #[instrument(skip(self))]
1056    pub fn get_transactions_from_addr(
1057        &self,
1058        addr: SuiAddress,
1059        cursor: Option<TxSequenceNumber>,
1060        limit: Option<usize>,
1061        reverse: bool,
1062    ) -> SuiResult<Vec<TransactionDigest>> {
1063        Self::get_transactions_from_index(
1064            &self.tables.transactions_from_addr,
1065            addr,
1066            cursor,
1067            limit,
1068            reverse,
1069        )
1070    }
1071
1072    #[instrument(skip(self))]
1073    pub fn get_transactions_by_move_function(
1074        &self,
1075        package: ObjectID,
1076        module: Option<String>,
1077        function: Option<String>,
1078        cursor: Option<TxSequenceNumber>,
1079        limit: Option<usize>,
1080        reverse: bool,
1081    ) -> SuiResult<Vec<TransactionDigest>> {
1082        // If we are passed a function with no module return a UserInputError
1083        if function.is_some() && module.is_none() {
1084            return Err(SuiErrorKind::UserInputError {
1085                error: UserInputError::MoveFunctionInputError(
1086                    "Cannot supply function without supplying module".to_string(),
1087                ),
1088            }
1089            .into());
1090        }
1091
1092        // We cannot have a cursor without filling out the other keys.
1093        if cursor.is_some() && (module.is_none() || function.is_none()) {
1094            return Err(SuiErrorKind::UserInputError {
1095                error: UserInputError::MoveFunctionInputError(
1096                    "Cannot supply cursor without supplying module and function".to_string(),
1097                ),
1098            }
1099            .into());
1100        }
1101
1102        let cursor_val = cursor.unwrap_or(if reverse {
1103            TxSequenceNumber::MAX
1104        } else {
1105            TxSequenceNumber::MIN
1106        });
1107
1108        let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1109        let module_val = module.clone().unwrap_or(if reverse {
1110            max_string.clone()
1111        } else {
1112            "".to_string()
1113        });
1114
1115        let function_val =
1116            function
1117                .clone()
1118                .unwrap_or(if reverse { max_string } else { "".to_string() });
1119
1120        let key = (package, module_val, function_val, cursor_val);
1121        Ok(if reverse {
1122            let iter = self
1123                .tables
1124                .transactions_by_move_function
1125                .reversed_safe_iter_with_bounds(None, Some(key))?
1126                // skip one more if exclusive cursor is Some
1127                .skip(usize::from(cursor.is_some()))
1128                .take_while(|result| {
1129                    result
1130                        .as_ref()
1131                        .map(|((id, m, f, _), _)| {
1132                            *id == package
1133                                && module.as_ref().map(|x| x == m).unwrap_or(true)
1134                                && function.as_ref().map(|x| x == f).unwrap_or(true)
1135                        })
1136                        .unwrap_or(false)
1137                })
1138                .map(|result| result.map(|(_, digest)| digest));
1139            if let Some(limit) = limit {
1140                iter.take(limit).collect::<Result<Vec<_>, _>>()?
1141            } else {
1142                iter.collect::<Result<Vec<_>, _>>()?
1143            }
1144        } else {
1145            let iter = self
1146                .tables
1147                .transactions_by_move_function
1148                .safe_iter_with_bounds(Some(key), None)
1149                .map(|result| result.expect("iterator db error"))
1150                // skip one more if exclusive cursor is Some
1151                .skip(usize::from(cursor.is_some()))
1152                .take_while(|((id, m, f, _), _)| {
1153                    *id == package
1154                        && module.as_ref().map(|x| x == m).unwrap_or(true)
1155                        && function.as_ref().map(|x| x == f).unwrap_or(true)
1156                })
1157                .map(|(_, digest)| digest);
1158            if let Some(limit) = limit {
1159                iter.take(limit).collect()
1160            } else {
1161                iter.collect()
1162            }
1163        })
1164    }
1165
1166    #[instrument(skip(self))]
1167    pub fn get_transactions_to_addr(
1168        &self,
1169        addr: SuiAddress,
1170        cursor: Option<TxSequenceNumber>,
1171        limit: Option<usize>,
1172        reverse: bool,
1173    ) -> SuiResult<Vec<TransactionDigest>> {
1174        Self::get_transactions_from_index(
1175            &self.tables.transactions_to_addr,
1176            addr,
1177            cursor,
1178            limit,
1179            reverse,
1180        )
1181    }
1182
1183    #[instrument(skip(self))]
1184    pub fn get_transaction_seq(
1185        &self,
1186        digest: &TransactionDigest,
1187    ) -> SuiResult<Option<TxSequenceNumber>> {
1188        Ok(self.tables.transactions_seq.get(digest)?)
1189    }
1190
1191    #[instrument(skip(self))]
1192    pub fn all_events(
1193        &self,
1194        tx_seq: TxSequenceNumber,
1195        event_seq: usize,
1196        limit: usize,
1197        descending: bool,
1198    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1199        Ok(if descending {
1200            self.tables
1201                .event_order
1202                .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1203                .take(limit)
1204                .map(|result| {
1205                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1206                        (digest, tx_digest, event_seq, time)
1207                    })
1208                })
1209                .collect::<Result<Vec<_>, _>>()?
1210        } else {
1211            self.tables
1212                .event_order
1213                .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1214                .take(limit)
1215                .map(|result| {
1216                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1217                        (digest, tx_digest, event_seq, time)
1218                    })
1219                })
1220                .collect::<Result<Vec<_>, _>>()?
1221        })
1222    }
1223
1224    #[instrument(skip(self))]
1225    pub fn events_by_transaction(
1226        &self,
1227        digest: &TransactionDigest,
1228        tx_seq: TxSequenceNumber,
1229        event_seq: usize,
1230        limit: usize,
1231        descending: bool,
1232    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1233        let seq = self
1234            .get_transaction_seq(digest)?
1235            .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1236        Ok(if descending {
1237            self.tables
1238                .event_order
1239                .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1240                .take_while(|result| {
1241                    result
1242                        .as_ref()
1243                        .map(|((tx, _), _)| tx == &seq)
1244                        .unwrap_or(false)
1245                })
1246                .take(limit)
1247                .map(|result| {
1248                    result.map(|((_, event_seq), (digest, tx_digest, time))| {
1249                        (digest, tx_digest, event_seq, time)
1250                    })
1251                })
1252                .collect::<Result<Vec<_>, _>>()?
1253        } else {
1254            self.tables
1255                .event_order
1256                .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1257                .map(|result| result.expect("iterator db error"))
1258                .take_while(|((tx, _), _)| tx == &seq)
1259                .take(limit)
1260                .map(|((_, event_seq), (digest, tx_digest, time))| {
1261                    (digest, tx_digest, event_seq, time)
1262                })
1263                .collect()
1264        })
1265    }
1266
1267    #[instrument(skip_all)]
1268    fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1269        index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1270        key: &KeyT,
1271        tx_seq: TxSequenceNumber,
1272        event_seq: usize,
1273        limit: usize,
1274        descending: bool,
1275    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1276        Ok(if descending {
1277            index
1278                .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1279                .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1280                .take(limit)
1281                .map(|result| {
1282                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1283                        (digest, tx_digest, event_seq, time)
1284                    })
1285                })
1286                .collect::<Result<Vec<_>, _>>()?
1287        } else {
1288            index
1289                .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1290                .map(|result| result.expect("iterator db error"))
1291                .take_while(|((m, _), _)| m == key)
1292                .take(limit)
1293                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1294                    (digest, tx_digest, event_seq, time)
1295                })
1296                .collect()
1297        })
1298    }
1299
1300    #[instrument(skip(self))]
1301    pub fn events_by_module_id(
1302        &self,
1303        module: &ModuleId,
1304        tx_seq: TxSequenceNumber,
1305        event_seq: usize,
1306        limit: usize,
1307        descending: bool,
1308    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1309        Self::get_event_from_index(
1310            &self.tables.event_by_move_module,
1311            module,
1312            tx_seq,
1313            event_seq,
1314            limit,
1315            descending,
1316        )
1317    }
1318
1319    #[instrument(skip(self))]
1320    pub fn events_by_move_event_struct_name(
1321        &self,
1322        struct_name: &StructTag,
1323        tx_seq: TxSequenceNumber,
1324        event_seq: usize,
1325        limit: usize,
1326        descending: bool,
1327    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1328        Self::get_event_from_index(
1329            &self.tables.event_by_move_event,
1330            struct_name,
1331            tx_seq,
1332            event_seq,
1333            limit,
1334            descending,
1335        )
1336    }
1337
1338    #[instrument(skip(self))]
1339    pub fn events_by_move_event_module(
1340        &self,
1341        module_id: &ModuleId,
1342        tx_seq: TxSequenceNumber,
1343        event_seq: usize,
1344        limit: usize,
1345        descending: bool,
1346    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1347        Self::get_event_from_index(
1348            &self.tables.event_by_event_module,
1349            module_id,
1350            tx_seq,
1351            event_seq,
1352            limit,
1353            descending,
1354        )
1355    }
1356
1357    #[instrument(skip(self))]
1358    pub fn events_by_sender(
1359        &self,
1360        sender: &SuiAddress,
1361        tx_seq: TxSequenceNumber,
1362        event_seq: usize,
1363        limit: usize,
1364        descending: bool,
1365    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1366        Self::get_event_from_index(
1367            &self.tables.event_by_sender,
1368            sender,
1369            tx_seq,
1370            event_seq,
1371            limit,
1372            descending,
1373        )
1374    }
1375
1376    #[instrument(skip(self))]
1377    pub fn event_iterator(
1378        &self,
1379        start_time: u64,
1380        end_time: u64,
1381        tx_seq: TxSequenceNumber,
1382        event_seq: usize,
1383        limit: usize,
1384        descending: bool,
1385    ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1386        Ok(if descending {
1387            self.tables
1388                .event_by_time
1389                .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1390                .take_while(|result| {
1391                    result
1392                        .as_ref()
1393                        .map(|((m, _), _)| m >= &start_time)
1394                        .unwrap_or(false)
1395                })
1396                .take(limit)
1397                .map(|result| {
1398                    result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1399                        (digest, tx_digest, event_seq, time)
1400                    })
1401                })
1402                .collect::<Result<Vec<_>, _>>()?
1403        } else {
1404            self.tables
1405                .event_by_time
1406                .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1407                .map(|result| result.expect("iterator db error"))
1408                .take_while(|((m, _), _)| m <= &end_time)
1409                .take(limit)
1410                .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1411                    (digest, tx_digest, event_seq, time)
1412                })
1413                .collect()
1414        })
1415    }
1416
1417    pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1418        match self
1419            .tables
1420            .event_by_time
1421            .reversed_safe_iter_with_bounds(
1422                None,
1423                Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1424            )?
1425            .next()
1426            .transpose()?
1427        {
1428            Some(((_, (watermark, _)), _)) => {
1429                if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1430                    info!(
1431                        "json rpc index pruning. Watermark is {} with digest {}",
1432                        watermark, digest
1433                    );
1434                }
1435                self.pruner_watermark.store(watermark, Ordering::Relaxed);
1436                self.tables.pruner_watermark.insert(&(), &watermark)?;
1437                Ok(watermark)
1438            }
1439            None => Ok(0),
1440        }
1441    }
1442
1443    pub fn get_dynamic_fields_iterator(
1444        &self,
1445        object: ObjectID,
1446        cursor: Option<ObjectID>,
1447    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1448    {
1449        debug!(?object, "get_dynamic_fields");
1450        // The object id 0 is the smallest possible
1451        let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
1452        let iter_upper_bound = (object, ObjectID::MAX);
1453        Ok(self
1454            .tables
1455            .dynamic_field_index
1456            .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
1457            // skip an extra b/c the cursor is exclusive
1458            .skip(usize::from(cursor.is_some()))
1459            .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
1460            .map_ok(|((_, c), object_info)| (c, object_info)))
1461    }
1462
1463    #[instrument(skip(self))]
1464    pub fn get_dynamic_field_object_id(
1465        &self,
1466        object: ObjectID,
1467        name_type: TypeTag,
1468        name_bcs_bytes: &[u8],
1469    ) -> SuiResult<Option<ObjectID>> {
1470        debug!(?object, "get_dynamic_field_object_id");
1471        let dynamic_field_id =
1472            dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1473                |e| {
1474                    SuiErrorKind::Unknown(format!(
1475                        "Unable to generate dynamic field id. Got error: {e:?}"
1476                    ))
1477                },
1478            )?;
1479
1480        if let Some(info) = self
1481            .tables
1482            .dynamic_field_index
1483            .get(&(object, dynamic_field_id))?
1484        {
1485            // info.object_id != dynamic_field_id ==> is_wrapper
1486            debug_assert!(
1487                info.object_id == dynamic_field_id
1488                    || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1489            );
1490            return Ok(Some(info.object_id));
1491        }
1492
1493        let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1494        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1495        let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1496            object,
1497            &dynamic_object_field_type,
1498            name_bcs_bytes,
1499        )
1500        .map_err(|e| {
1501            SuiErrorKind::Unknown(format!(
1502                "Unable to generate dynamic field id. Got error: {e:?}"
1503            ))
1504        })?;
1505        if let Some(info) = self
1506            .tables
1507            .dynamic_field_index
1508            .get(&(object, dynamic_object_field_id))?
1509        {
1510            return Ok(Some(info.object_id));
1511        }
1512
1513        Ok(None)
1514    }
1515
1516    #[instrument(skip(self))]
1517    pub fn get_owner_objects(
1518        &self,
1519        owner: SuiAddress,
1520        cursor: Option<ObjectID>,
1521        limit: usize,
1522        filter: Option<SuiObjectDataFilter>,
1523    ) -> SuiResult<Vec<ObjectInfo>> {
1524        let cursor = match cursor {
1525            Some(cursor) => cursor,
1526            None => ObjectID::ZERO,
1527        };
1528        Ok(self
1529            .get_owner_objects_iterator(owner, cursor, filter)?
1530            .take(limit)
1531            .collect())
1532    }
1533
1534    pub fn get_owned_coins_iterator(
1535        coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1536        owner: SuiAddress,
1537        coin_type_tag: Option<String>,
1538    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1539        let all_coins = coin_type_tag.is_none();
1540        let starting_coin_type =
1541            coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1542        let start_key =
1543            CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1544        Ok(coin_index
1545            .safe_iter_with_bounds(Some(start_key), None)
1546            .map(|result| result.expect("iterator db error"))
1547            .take_while(move |(key, _)| {
1548                if key.owner != owner {
1549                    return false;
1550                }
1551                if !all_coins && starting_coin_type != key.coin_type {
1552                    return false;
1553                }
1554                true
1555            }))
1556    }
1557
1558    pub fn get_owned_coins_iterator_with_cursor(
1559        &self,
1560        owner: SuiAddress,
1561        cursor: (String, u64, ObjectID),
1562        limit: usize,
1563        one_coin_type_only: bool,
1564    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1565        let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1566        let start_key = CoinIndexKey2::new_from_cursor(
1567            owner,
1568            starting_coin_type.clone(),
1569            inverted_balance,
1570            starting_object_id,
1571        );
1572        Ok(self
1573            .tables
1574            .coin_index_2
1575            .safe_iter_with_bounds(Some(start_key), None)
1576            .map(|result| result.expect("iterator db error"))
1577            .filter(move |(key, _)| key.object_id != starting_object_id)
1578            .enumerate()
1579            .take_while(move |(index, (key, _))| {
1580                if *index >= limit {
1581                    return false;
1582                }
1583                if key.owner != owner {
1584                    return false;
1585                }
1586                if one_coin_type_only && starting_coin_type != key.coin_type {
1587                    return false;
1588                }
1589                true
1590            })
1591            .map(|(_index, (key, info))| (key, info)))
1592    }
1593
1594    /// starting_object_id can be used to implement pagination, where a client remembers the last
1595    /// object id of each page, and use it to query the next page.
1596    pub fn get_owner_objects_iterator(
1597        &self,
1598        owner: SuiAddress,
1599        starting_object_id: ObjectID,
1600        filter: Option<SuiObjectDataFilter>,
1601    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1602        Ok(self
1603            .tables
1604            .owner_index
1605            // The object id 0 is the smallest possible
1606            .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1607            .map(|result| result.expect("iterator db error"))
1608            .skip(usize::from(starting_object_id != ObjectID::ZERO))
1609            .take_while(move |((address_owner, _), _)| address_owner == &owner)
1610            .filter(move |(_, o)| {
1611                if let Some(filter) = filter.as_ref() {
1612                    filter.matches(o)
1613                } else {
1614                    true
1615                }
1616            })
1617            .map(|(_, object_info)| object_info))
1618    }
1619
1620    pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
1621        let mut batch = self.tables.owner_index.batch();
1622        batch.insert_batch(
1623            &self.tables.owner_index,
1624            object_index_changes.new_owners.into_iter(),
1625        )?;
1626        batch.insert_batch(
1627            &self.tables.dynamic_field_index,
1628            object_index_changes.new_dynamic_fields.into_iter(),
1629        )?;
1630        batch.write()?;
1631        Ok(())
1632    }
1633
1634    pub fn is_empty(&self) -> bool {
1635        self.tables.owner_index.is_empty()
1636    }
1637
1638    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1639        // We are checkpointing the whole db
1640        self.tables
1641            .transactions_from_addr
1642            .checkpoint_db(path)
1643            .map_err(Into::into)
1644    }
1645
1646    /// This method first gets the balance from `per_coin_type_balance` cache. On a cache miss, it
1647    /// gets the balance for passed in `coin_type` from the `all_balance` cache. Only on the second
1648    /// cache miss, we go to the database (expensive) and update the cache. Notice that db read is
1649    /// done with `spawn_blocking` as that is expected to block
1650    #[instrument(skip(self))]
1651    pub fn get_balance(&self, owner: SuiAddress, coin_type: TypeTag) -> SuiResult<TotalBalance> {
1652        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1653        let cloned_coin_type = coin_type.clone();
1654        let metrics_cloned = self.metrics.clone();
1655        let coin_index_cloned = self.tables.coin_index_2.clone();
1656        if force_disable_cache {
1657            Self::get_balance_from_db(metrics_cloned, coin_index_cloned, owner, cloned_coin_type)
1658                .map_err(|e| {
1659                SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1660            })?;
1661        }
1662
1663        self.metrics.balance_lookup_from_total.inc();
1664
1665        let balance = self
1666            .caches
1667            .per_coin_type_balance
1668            .get(&(owner, coin_type.clone()));
1669        if let Some(balance) = balance {
1670            return balance;
1671        }
1672        // cache miss, lookup in all balance cache
1673        let all_balance = self.caches.all_balances.get(&owner.clone());
1674        if let Some(Ok(all_balance)) = all_balance
1675            && let Some(balance) = all_balance.get(&coin_type)
1676        {
1677            return Ok(*balance);
1678        }
1679        let cloned_coin_type = coin_type.clone();
1680        let metrics_cloned = self.metrics.clone();
1681        let coin_index_cloned = self.tables.coin_index_2.clone();
1682        self.caches
1683            .per_coin_type_balance
1684            .get_with((owner, coin_type), move || {
1685                Self::get_balance_from_db(
1686                    metrics_cloned,
1687                    coin_index_cloned,
1688                    owner,
1689                    cloned_coin_type,
1690                )
1691                .map_err(|e| {
1692                    SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1693                        .into()
1694                })
1695            })
1696    }
1697
1698    /// This method gets the balance for all coin types from the `all_balance` cache. On a cache miss,
1699    /// we go to the database (expensive) and update the cache. This cache is dual purpose in the
1700    /// sense that it not only serves `get_AllBalance()` calls but is also used for serving
1701    /// `get_Balance()` queries. Notice that db read is performed with `spawn_blocking` as that is
1702    /// expected to block
1703    #[instrument(skip(self))]
1704    pub fn get_all_balance(
1705        &self,
1706        owner: SuiAddress,
1707    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1708        let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1709        let metrics_cloned = self.metrics.clone();
1710        let coin_index_cloned = self.tables.coin_index_2.clone();
1711        if force_disable_cache {
1712            Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(
1713                |e| {
1714                    SuiErrorKind::ExecutionError(format!(
1715                        "Failed to read all balance from DB: {:?}",
1716                        e
1717                    ))
1718                },
1719            )?;
1720        }
1721
1722        self.metrics.all_balance_lookup_from_total.inc();
1723        let metrics_cloned = self.metrics.clone();
1724        let coin_index_cloned = self.tables.coin_index_2.clone();
1725        self.caches.all_balances.get_with(owner, move || {
1726            Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
1727                SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
1728                    .into()
1729            })
1730        })
1731    }
1732
1733    /// Read balance for a `SuiAddress` and `CoinType` from the backend database
1734    #[instrument(skip_all)]
1735    pub fn get_balance_from_db(
1736        metrics: Arc<IndexStoreMetrics>,
1737        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1738        owner: SuiAddress,
1739        coin_type: TypeTag,
1740    ) -> SuiResult<TotalBalance> {
1741        metrics.balance_lookup_from_db.inc();
1742        let coin_type_str = coin_type.to_string();
1743        let coins =
1744            Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
1745
1746        let mut balance = 0i128;
1747        let mut num_coins = 0;
1748        for (_key, coin_info) in coins {
1749            balance += coin_info.balance as i128;
1750            num_coins += 1;
1751        }
1752        Ok(TotalBalance { balance, num_coins })
1753    }
1754
1755    /// Read all balances for a `SuiAddress` from the backend database
1756    #[instrument(skip_all)]
1757    pub fn get_all_balances_from_db(
1758        metrics: Arc<IndexStoreMetrics>,
1759        coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1760        owner: SuiAddress,
1761    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1762        metrics.all_balance_lookup_from_db.inc();
1763        let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1764        let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1765            .chunk_by(|(key, _coin)| key.coin_type.clone());
1766        for (coin_type, coins) in &coins {
1767            let mut total_balance = 0i128;
1768            let mut coin_object_count = 0;
1769            for (_, coin_info) in coins {
1770                total_balance += coin_info.balance as i128;
1771                coin_object_count += 1;
1772            }
1773            let coin_type =
1774                TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
1775                    SuiErrorKind::ExecutionError(format!(
1776                        "Failed to parse event sender address: {:?}",
1777                        e
1778                    ))
1779                })?));
1780            balances.insert(
1781                coin_type,
1782                TotalBalance {
1783                    num_coins: coin_object_count,
1784                    balance: total_balance,
1785                },
1786            );
1787        }
1788        Ok(Arc::new(balances))
1789    }
1790
1791    fn invalidate_per_coin_type_cache(
1792        &self,
1793        keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
1794    ) -> SuiResult {
1795        self.caches.per_coin_type_balance.batch_invalidate(keys);
1796        Ok(())
1797    }
1798
1799    fn invalidate_all_balance_cache(
1800        &self,
1801        addresses: impl IntoIterator<Item = SuiAddress>,
1802    ) -> SuiResult {
1803        self.caches.all_balances.batch_invalidate(addresses);
1804        Ok(())
1805    }
1806
1807    fn update_per_coin_type_cache(
1808        &self,
1809        keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
1810    ) -> SuiResult {
1811        self.caches
1812            .per_coin_type_balance
1813            .batch_merge(keys, Self::merge_balance);
1814        Ok(())
1815    }
1816
1817    fn merge_balance(
1818        old_balance: &SuiResult<TotalBalance>,
1819        balance_delta: &SuiResult<TotalBalance>,
1820    ) -> SuiResult<TotalBalance> {
1821        if let Ok(old_balance) = old_balance {
1822            if let Ok(balance_delta) = balance_delta {
1823                Ok(TotalBalance {
1824                    balance: old_balance.balance + balance_delta.balance,
1825                    num_coins: old_balance.num_coins + balance_delta.num_coins,
1826                })
1827            } else {
1828                balance_delta.clone()
1829            }
1830        } else {
1831            old_balance.clone()
1832        }
1833    }
1834
1835    fn update_all_balance_cache(
1836        &self,
1837        keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1838    ) -> SuiResult {
1839        self.caches
1840            .all_balances
1841            .batch_merge(keys, Self::merge_all_balance);
1842        Ok(())
1843    }
1844
1845    fn merge_all_balance(
1846        old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1847        balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1848    ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1849        if let Ok(old_balance) = old_balance {
1850            if let Ok(balance_delta) = balance_delta {
1851                let mut new_balance = HashMap::new();
1852                for (key, value) in old_balance.iter() {
1853                    new_balance.insert(key.clone(), *value);
1854                }
1855                for (key, delta) in balance_delta.iter() {
1856                    let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1857                        balance: 0,
1858                        num_coins: 0,
1859                    });
1860                    let new_total = TotalBalance {
1861                        balance: old.balance + delta.balance,
1862                        num_coins: old.num_coins + delta.num_coins,
1863                    };
1864                    new_balance.insert(key.clone(), new_total);
1865                }
1866                Ok(Arc::new(new_balance))
1867            } else {
1868                balance_delta.clone()
1869            }
1870        } else {
1871            old_balance.clone()
1872        }
1873    }
1874}
1875
1876#[cfg(test)]
1877mod tests {
1878    use super::IndexStore;
1879    use super::ObjectIndexChanges;
1880    use move_core_types::account_address::AccountAddress;
1881    use prometheus::Registry;
1882    use std::collections::BTreeMap;
1883    use std::env::temp_dir;
1884    use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
1885    use sui_types::digests::TransactionDigest;
1886    use sui_types::effects::TransactionEvents;
1887    use sui_types::gas_coin::GAS;
1888    use sui_types::object;
1889    use sui_types::object::Owner;
1890
1891    #[tokio::test]
1892    async fn test_index_cache() -> anyhow::Result<()> {
1893        // This test is going to invoke `index_tx()`where 10 coins each with balance 100
1894        // are going to be added to an address. The balance is then going to be read from the db
1895        // and the cache. It should be 1000. Then, we are going to delete 3 of those coins from
1896        // the address and invoke `index_tx()` again and read balance. The balance should be 700
1897        // and verified from both db and cache.
1898        // This tests make sure we are invalidating entries in the cache and always reading latest
1899        // balance.
1900        let index_store =
1901            IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
1902        let address: SuiAddress = AccountAddress::random().into();
1903        let mut written_objects = BTreeMap::new();
1904        let mut input_objects = BTreeMap::new();
1905        let mut object_map = BTreeMap::new();
1906
1907        let mut new_objects = vec![];
1908        for _i in 0..10 {
1909            let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1910            new_objects.push((
1911                (address, object.id()),
1912                ObjectInfo {
1913                    object_id: object.id(),
1914                    version: object.version(),
1915                    digest: object.digest(),
1916                    type_: ObjectType::Struct(object.type_().unwrap().clone()),
1917                    owner: Owner::AddressOwner(address),
1918                    previous_transaction: object.previous_transaction,
1919                },
1920            ));
1921            object_map.insert(object.id(), object.clone());
1922            written_objects.insert(object.data.id(), object);
1923        }
1924        let object_index_changes = ObjectIndexChanges {
1925            deleted_owners: vec![],
1926            deleted_dynamic_fields: vec![],
1927            new_owners: new_objects,
1928            new_dynamic_fields: vec![],
1929        };
1930
1931        let tx_coins = (input_objects.clone(), written_objects.clone());
1932        index_store.index_tx(
1933            address,
1934            vec![].into_iter(),
1935            vec![].into_iter(),
1936            vec![].into_iter(),
1937            &TransactionEvents { data: vec![] },
1938            object_index_changes,
1939            &TransactionDigest::random(),
1940            1234,
1941            Some(tx_coins),
1942        )?;
1943
1944        let balance_from_db = IndexStore::get_balance_from_db(
1945            index_store.metrics.clone(),
1946            index_store.tables.coin_index_2.clone(),
1947            address,
1948            GAS::type_tag(),
1949        )?;
1950        let balance = index_store.get_balance(address, GAS::type_tag())?;
1951        assert_eq!(balance, balance_from_db);
1952        assert_eq!(balance.balance, 1000);
1953        assert_eq!(balance.num_coins, 10);
1954
1955        let all_balance = index_store.get_all_balance(address)?;
1956        let balance = all_balance.get(&GAS::type_tag()).unwrap();
1957        assert_eq!(*balance, balance_from_db);
1958        assert_eq!(balance.balance, 1000);
1959        assert_eq!(balance.num_coins, 10);
1960
1961        written_objects.clear();
1962        let mut deleted_objects = vec![];
1963        for (id, object) in object_map.iter().take(3) {
1964            deleted_objects.push((address, *id));
1965            input_objects.insert(*id, object.to_owned());
1966        }
1967        let object_index_changes = ObjectIndexChanges {
1968            deleted_owners: deleted_objects.clone(),
1969            deleted_dynamic_fields: vec![],
1970            new_owners: vec![],
1971            new_dynamic_fields: vec![],
1972        };
1973        let tx_coins = (input_objects, written_objects);
1974        index_store.index_tx(
1975            address,
1976            vec![].into_iter(),
1977            vec![].into_iter(),
1978            vec![].into_iter(),
1979            &TransactionEvents { data: vec![] },
1980            object_index_changes,
1981            &TransactionDigest::random(),
1982            1234,
1983            Some(tx_coins),
1984        )?;
1985        let balance_from_db = IndexStore::get_balance_from_db(
1986            index_store.metrics.clone(),
1987            index_store.tables.coin_index_2.clone(),
1988            address,
1989            GAS::type_tag(),
1990        )?;
1991        let balance = index_store.get_balance(address, GAS::type_tag())?;
1992        assert_eq!(balance, balance_from_db);
1993        assert_eq!(balance.balance, 700);
1994        assert_eq!(balance.num_coins, 7);
1995        // Invalidate per coin type balance cache and read from all balance cache to ensure
1996        // the balance matches
1997        index_store
1998            .caches
1999            .per_coin_type_balance
2000            .invalidate(&(address, GAS::type_tag()));
2001        let all_balance = index_store.get_all_balance(address)?;
2002        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2003        assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2004        let balance = index_store.get_balance(address, GAS::type_tag())?;
2005        assert_eq!(balance, balance_from_db);
2006        assert_eq!(balance.balance, 700);
2007        assert_eq!(balance.num_coins, 7);
2008
2009        Ok(())
2010    }
2011
2012    #[tokio::test]
2013    async fn test_get_transaction_by_move_function() {
2014        use sui_types::base_types::ObjectID;
2015        use typed_store::Map;
2016
2017        let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2018        let db = &index_store.tables.transactions_by_move_function;
2019        db.insert(
2020            &(
2021                ObjectID::new([1; 32]),
2022                "mod".to_string(),
2023                "f".to_string(),
2024                0,
2025            ),
2026            &[0; 32].into(),
2027        )
2028        .unwrap();
2029        db.insert(
2030            &(
2031                ObjectID::new([1; 32]),
2032                "mod".to_string(),
2033                "Z".repeat(128),
2034                0,
2035            ),
2036            &[1; 32].into(),
2037        )
2038        .unwrap();
2039        db.insert(
2040            &(
2041                ObjectID::new([1; 32]),
2042                "mod".to_string(),
2043                "f".repeat(128),
2044                0,
2045            ),
2046            &[2; 32].into(),
2047        )
2048        .unwrap();
2049        db.insert(
2050            &(
2051                ObjectID::new([1; 32]),
2052                "mod".to_string(),
2053                "z".repeat(128),
2054                0,
2055            ),
2056            &[3; 32].into(),
2057        )
2058        .unwrap();
2059
2060        let mut v = index_store
2061            .get_transactions_by_move_function(
2062                ObjectID::new([1; 32]),
2063                Some("mod".to_string()),
2064                None,
2065                None,
2066                None,
2067                false,
2068            )
2069            .unwrap();
2070        let v_rev = index_store
2071            .get_transactions_by_move_function(
2072                ObjectID::new([1; 32]),
2073                Some("mod".to_string()),
2074                None,
2075                None,
2076                None,
2077                true,
2078            )
2079            .unwrap();
2080        v.reverse();
2081        assert_eq!(v, v_rev);
2082    }
2083}