1use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use mysten_common::ZipDebugEqIteratorExt;
17use parking_lot::ArcMutexGuard;
18use prometheus::{
19 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
20 register_int_counter_with_registry,
21};
22use serde::{Deserialize, Serialize, de::DeserializeOwned};
23use sui_types::accumulator_event::AccumulatorEvent;
24use typed_store::TypedStoreError;
25use typed_store::rocksdb::compaction_filter::Decision;
26
27use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
28use sui_storage::mutex_table::MutexTable;
29use sui_storage::sharded_lru::ShardedLruCache;
30use sui_types::base_types::{
31 ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
32};
33use sui_types::base_types::{ObjectInfo, ObjectRef};
34use sui_types::digests::TransactionEventsDigest;
35use sui_types::dynamic_field::{self, DynamicFieldInfo};
36use sui_types::effects::TransactionEvents;
37use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
38use sui_types::inner_temporary_store::TxCoins;
39use sui_types::object::{Object, Owner};
40use sui_types::parse_sui_struct_tag;
41use sui_types::storage::error::Error as StorageError;
42use tracing::{debug, info, instrument, trace};
43use typed_store::DBMapUtils;
44use typed_store::rocks::{
45 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, StagedBatch, default_db_options,
46 read_size_from_env,
47};
48use typed_store::traits::Map;
49
50type OwnerIndexKey = (SuiAddress, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
57pub struct CoinIndexKey2 {
58 pub owner: SuiAddress,
59 pub coin_type: String,
60 pub inverted_balance: u64,
63 pub object_id: ObjectID,
64}
65
66impl CoinIndexKey2 {
67 pub fn new_from_cursor(
68 owner: SuiAddress,
69 coin_type: String,
70 inverted_balance: u64,
71 object_id: ObjectID,
72 ) -> Self {
73 Self {
74 owner,
75 coin_type,
76 inverted_balance,
77 object_id,
78 }
79 }
80
81 pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
82 Self {
83 owner,
84 coin_type,
85 inverted_balance: !balance,
86 object_id,
87 }
88 }
89}
90
91const CURRENT_DB_VERSION: u64 = 0;
92const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
93
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
95struct MetadataInfo {
96 version: u64,
98 column_families: BTreeMap<String, ColumnFamilyInfo>,
103}
104
105#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106struct ColumnFamilyInfo {
107 version: u64,
108}
109
110pub const MAX_TX_RANGE_SIZE: u64 = 4096;
111
112pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
113const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
114const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
115const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
116
117#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
118pub struct TotalBalance {
119 pub balance: i128,
120 pub num_coins: i64,
121 pub address_balance: u64,
122}
123
124#[derive(Debug)]
125pub struct ObjectIndexChanges {
126 pub deleted_owners: Vec<OwnerIndexKey>,
127 pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
128 pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
129 pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
130}
131
132#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
133pub struct CoinInfo {
134 pub version: SequenceNumber,
135 pub digest: ObjectDigest,
136 pub balance: u64,
137 pub previous_transaction: TransactionDigest,
138}
139
140impl CoinInfo {
141 pub fn from_object(object: &Object) -> Option<CoinInfo> {
142 object.as_coin_maybe().map(|coin| CoinInfo {
143 version: object.version(),
144 digest: object.digest(),
145 previous_transaction: object.previous_transaction,
146 balance: coin.value(),
147 })
148 }
149}
150
151pub struct IndexStoreMetrics {
152 balance_lookup_from_db: IntCounter,
153 balance_lookup_from_total: IntCounter,
154 all_balance_lookup_from_db: IntCounter,
155 all_balance_lookup_from_total: IntCounter,
156}
157
158impl IndexStoreMetrics {
159 pub fn new(registry: &Registry) -> IndexStoreMetrics {
160 Self {
161 balance_lookup_from_db: register_int_counter_with_registry!(
162 "balance_lookup_from_db",
163 "Total number of balance requests served from cache",
164 registry,
165 )
166 .unwrap(),
167 balance_lookup_from_total: register_int_counter_with_registry!(
168 "balance_lookup_from_total",
169 "Total number of balance requests served ",
170 registry,
171 )
172 .unwrap(),
173 all_balance_lookup_from_db: register_int_counter_with_registry!(
174 "all_balance_lookup_from_db",
175 "Total number of all balance requests served from cache",
176 registry,
177 )
178 .unwrap(),
179 all_balance_lookup_from_total: register_int_counter_with_registry!(
180 "all_balance_lookup_from_total",
181 "Total number of all balance requests served",
182 registry,
183 )
184 .unwrap(),
185 }
186 }
187}
188
189pub struct IndexStoreCaches {
190 per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
191 all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
192 pub locks: MutexTable<SuiAddress>,
193}
194
195type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
196
197pub struct IndexStoreCacheUpdatesWithLocks {
202 pub(crate) _locks: Option<Vec<OwnedMutexGuard<()>>>,
203 pub(crate) inner: IndexStoreCacheUpdates,
204}
205
206impl IndexStoreCacheUpdatesWithLocks {
207 pub fn into_inner(self) -> IndexStoreCacheUpdates {
208 self.inner
209 }
210}
211
212#[derive(Default)]
215pub struct IndexStoreCacheUpdates {
216 per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
217 all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
218}
219
220#[derive(DBMapUtils)]
221pub struct IndexStoreTables {
222 meta: DBMap<(), MetadataInfo>,
229
230 transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
232
233 transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
235
236 #[deprecated]
238 transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
239
240 #[deprecated]
242 transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
243
244 transactions_by_move_function:
246 DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
247
248 transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
250
251 transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
253
254 owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
259
260 coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
261 address_balances: DBMap<(SuiAddress, TypeTag), ()>,
263
264 dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
269
270 event_order: DBMap<EventId, EventIndex>,
271 event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
272 event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
273 event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
274 event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
275 event_by_time: DBMap<(u64, EventId), EventIndex>,
276
277 pruner_watermark: DBMap<(), TxSequenceNumber>,
278}
279
280impl IndexStoreTables {
281 pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
282 &self.owner_index
283 }
284
285 pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
286 &self.coin_index_2
287 }
288
289 pub fn check_databases_equal(&self, other: &IndexStoreTables) {
290 fn assert_tables_equal<K, V>(name: &str, table_a: &DBMap<K, V>, table_b: &DBMap<K, V>)
291 where
292 K: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
293 V: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
294 {
295 let mut iter_a = table_a.safe_iter();
296 let mut iter_b = table_b.safe_iter();
297 let mut count = 0u64;
298 loop {
299 match (iter_a.next(), iter_b.next()) {
300 (Some(a), Some(b)) => {
301 let (ka, va): (K, V) = a.expect("failed to read from table_a");
302 let (kb, vb): (K, V) = b.expect("failed to read from table_b");
303 assert!(
304 ka == kb && va == vb,
305 "{name}: mismatch at entry {count}:\n a=({ka:?}, {va:?})\n b=({kb:?}, {vb:?})"
306 );
307 count += 1;
308 }
309 (None, None) => break,
310 (Some(_), None) => {
311 panic!(
312 "{name}: table_a has more entries than table_b (diverged after {count} entries)"
313 );
314 }
315 (None, Some(_)) => {
316 panic!(
317 "{name}: table_b has more entries than table_a (diverged after {count} entries)"
318 );
319 }
320 }
321 }
322 info!("{name}: verified {count} entries are identical");
323 }
324
325 fn assert_seq_table_equal<P, V>(
331 name: &str,
332 table_a: &DBMap<(P, TxSequenceNumber), V>,
333 table_b: &DBMap<(P, TxSequenceNumber), V>,
334 ) where
335 P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
336 V: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
337 {
338 let mut entries_a: Vec<(P, V)> = table_a
339 .safe_iter()
340 .map(|r| {
341 let ((p, _seq), v) = r.expect("failed to read from table_a");
342 (p, v)
343 })
344 .collect();
345 let mut entries_b: Vec<(P, V)> = table_b
346 .safe_iter()
347 .map(|r| {
348 let ((p, _seq), v) = r.expect("failed to read from table_b");
349 (p, v)
350 })
351 .collect();
352 entries_a.sort();
353 entries_a.dedup();
354 entries_b.sort();
355 entries_b.dedup();
356 assert!(
357 entries_a.len() == entries_b.len(),
358 "{name}: different number of unique entries: {} vs {}",
359 entries_a.len(),
360 entries_b.len()
361 );
362 for (i, (a, b)) in entries_a.iter().zip_debug_eq(entries_b.iter()).enumerate() {
363 assert!(
364 a == b,
365 "{name}: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
366 );
367 }
368 info!(
369 "{name}: verified {} unique entries match (ignoring sequence numbers)",
370 entries_a.len()
371 );
372 }
373
374 type EventKey = (TransactionEventsDigest, TransactionDigest);
377 fn strip_timestamp(ei: EventIndex) -> EventKey {
378 (ei.0, ei.1)
379 }
380
381 fn assert_event_table_equal<P>(
385 name: &str,
386 table_a: &DBMap<(P, EventId), EventIndex>,
387 table_b: &DBMap<(P, EventId), EventIndex>,
388 ) where
389 P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
390 {
391 let mut entries_a: Vec<(P, EventKey)> = table_a
392 .safe_iter()
393 .map(|r| {
394 let ((p, _eid), v) = r.expect("failed to read from table_a");
395 (p, strip_timestamp(v))
396 })
397 .collect();
398 let mut entries_b: Vec<(P, EventKey)> = table_b
399 .safe_iter()
400 .map(|r| {
401 let ((p, _eid), v) = r.expect("failed to read from table_b");
402 (p, strip_timestamp(v))
403 })
404 .collect();
405 entries_a.sort();
406 entries_a.dedup();
407 entries_b.sort();
408 entries_b.dedup();
409 assert!(
410 entries_a.len() == entries_b.len(),
411 "{name}: different number of unique entries: {} vs {}",
412 entries_a.len(),
413 entries_b.len()
414 );
415 for (i, (a, b)) in entries_a.iter().zip_debug_eq(entries_b.iter()).enumerate() {
416 assert!(
417 a == b,
418 "{name}: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
419 );
420 }
421 info!(
422 "{name}: verified {} unique entries match (ignoring event ids and timestamps)",
423 entries_a.len()
424 );
425 }
426
427 assert_tables_equal("owner_index", &self.owner_index, &other.owner_index);
429 assert_tables_equal("coin_index_2", &self.coin_index_2, &other.coin_index_2);
430 assert_tables_equal(
431 "address_balances",
432 &self.address_balances,
433 &other.address_balances,
434 );
435 assert_tables_equal(
436 "dynamic_field_index",
437 &self.dynamic_field_index,
438 &other.dynamic_field_index,
439 );
440
441 assert_seq_table_equal(
443 "transactions_from_addr",
444 &self.transactions_from_addr,
445 &other.transactions_from_addr,
446 );
447 assert_seq_table_equal(
448 "transactions_to_addr",
449 &self.transactions_to_addr,
450 &other.transactions_to_addr,
451 );
452 {
454 let mut entries_a: Vec<((ObjectID, String, String), TransactionDigest)> = self
455 .transactions_by_move_function
456 .safe_iter()
457 .map(|r| {
458 let ((pkg, module, func, _seq), digest) =
459 r.expect("failed to read from table_a");
460 ((pkg, module, func), digest)
461 })
462 .collect();
463 let mut entries_b: Vec<((ObjectID, String, String), TransactionDigest)> = other
464 .transactions_by_move_function
465 .safe_iter()
466 .map(|r| {
467 let ((pkg, module, func, _seq), digest) =
468 r.expect("failed to read from table_b");
469 ((pkg, module, func), digest)
470 })
471 .collect();
472 entries_a.sort();
473 entries_a.dedup();
474 entries_b.sort();
475 entries_b.dedup();
476 assert!(
477 entries_a.len() == entries_b.len(),
478 "transactions_by_move_function: different number of unique entries: {} vs {}",
479 entries_a.len(),
480 entries_b.len()
481 );
482 for (i, (a, b)) in entries_a.iter().zip_debug_eq(entries_b.iter()).enumerate() {
483 assert!(
484 a == b,
485 "transactions_by_move_function: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
486 );
487 }
488 info!(
489 "transactions_by_move_function: verified {} entries match (ignoring sequence numbers)",
490 entries_a.len()
491 );
492 }
493
494 {
496 let mut vals_a: Vec<EventKey> = self
498 .event_order
499 .safe_iter()
500 .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
501 .collect();
502 let mut vals_b: Vec<EventKey> = other
503 .event_order
504 .safe_iter()
505 .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
506 .collect();
507 vals_a.sort();
508 vals_a.dedup();
509 vals_b.sort();
510 vals_b.dedup();
511 assert!(
512 vals_a.len() == vals_b.len(),
513 "event_order: different number of entries: {} vs {}",
514 vals_a.len(),
515 vals_b.len()
516 );
517 for (i, (a, b)) in vals_a.iter().zip_debug_eq(vals_b.iter()).enumerate() {
518 assert!(
519 a == b,
520 "event_order: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
521 );
522 }
523 info!(
524 "event_order: verified {} entries match (ignoring event ids and timestamps)",
525 vals_a.len()
526 );
527 }
528 assert_event_table_equal(
529 "event_by_move_module",
530 &self.event_by_move_module,
531 &other.event_by_move_module,
532 );
533 assert_event_table_equal(
534 "event_by_move_event",
535 &self.event_by_move_event,
536 &other.event_by_move_event,
537 );
538 assert_event_table_equal(
539 "event_by_event_module",
540 &self.event_by_event_module,
541 &other.event_by_event_module,
542 );
543 assert_event_table_equal(
544 "event_by_sender",
545 &self.event_by_sender,
546 &other.event_by_sender,
547 );
548 {
551 let mut vals_a: Vec<EventKey> = self
552 .event_by_time
553 .safe_iter()
554 .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
555 .collect();
556 let mut vals_b: Vec<EventKey> = other
557 .event_by_time
558 .safe_iter()
559 .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
560 .collect();
561 vals_a.sort();
562 vals_a.dedup();
563 vals_b.sort();
564 vals_b.dedup();
565 assert!(
566 vals_a.len() == vals_b.len(),
567 "event_by_time: different number of entries: {} vs {}",
568 vals_a.len(),
569 vals_b.len()
570 );
571 for (i, (a, b)) in vals_a.iter().zip_debug_eq(vals_b.iter()).enumerate() {
572 assert!(
573 a == b,
574 "event_by_time: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
575 );
576 }
577 info!(
578 "event_by_time: verified {} entries match (ignoring timestamps and event ids)",
579 vals_a.len()
580 );
581 }
582
583 }
589
590 #[allow(deprecated)]
591 fn init(&mut self) -> Result<(), StorageError> {
592 let metadata = {
593 match self.meta.get(&()) {
594 Ok(Some(metadata)) => metadata,
595 Ok(None) | Err(_) => MetadataInfo {
596 version: CURRENT_DB_VERSION,
597 column_families: BTreeMap::new(),
598 },
599 }
600 };
601
602 self.meta.insert(&(), &metadata)?;
604
605 Ok(())
606 }
607
608 pub fn get_dynamic_fields_iterator(
609 &self,
610 object: ObjectID,
611 cursor: Option<ObjectID>,
612 ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
613 debug!(?object, "get_dynamic_fields");
614 let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
616 let iter_upper_bound = (object, ObjectID::MAX);
617 self.dynamic_field_index
618 .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
619 .skip(usize::from(cursor.is_some()))
621 .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
622 .map_ok(|((_, c), object_info)| (c, object_info))
623 }
624}
625
626pub struct IndexStore {
627 next_sequence_number: AtomicU64,
628 tables: IndexStoreTables,
629 pub caches: IndexStoreCaches,
630 metrics: Arc<IndexStoreMetrics>,
631 max_type_length: u64,
632 remove_deprecated_tables: bool,
633 pruner_watermark: Arc<AtomicU64>,
634}
635
636struct JsonRpcCompactionMetrics {
637 key_removed: IntCounterVec,
638 key_kept: IntCounterVec,
639 key_error: IntCounterVec,
640}
641
642impl JsonRpcCompactionMetrics {
643 pub fn new(registry: &Registry) -> Arc<Self> {
644 Arc::new(Self {
645 key_removed: register_int_counter_vec_with_registry!(
646 "json_rpc_compaction_filter_key_removed",
647 "Compaction key removed",
648 &["cf"],
649 registry
650 )
651 .unwrap(),
652 key_kept: register_int_counter_vec_with_registry!(
653 "json_rpc_compaction_filter_key_kept",
654 "Compaction key kept",
655 &["cf"],
656 registry
657 )
658 .unwrap(),
659 key_error: register_int_counter_vec_with_registry!(
660 "json_rpc_compaction_filter_key_error",
661 "Compaction error",
662 &["cf"],
663 registry
664 )
665 .unwrap(),
666 })
667 }
668}
669
670fn compaction_filter_config<T: DeserializeOwned>(
671 name: &str,
672 metrics: Arc<JsonRpcCompactionMetrics>,
673 mut db_options: DBOptions,
674 pruner_watermark: Arc<AtomicU64>,
675 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
676 by_key: bool,
677) -> DBOptions {
678 let cf = name.to_string();
679 db_options
680 .options
681 .set_compaction_filter(name, move |_, key, value| {
682 let bytes = if by_key { key } else { value };
683 let deserializer = bincode::DefaultOptions::new()
684 .with_big_endian()
685 .with_fixint_encoding();
686 match deserializer.deserialize(bytes) {
687 Ok(key_data) => {
688 let sequence_number = extractor(key_data);
689 if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
690 metrics.key_removed.with_label_values(&[&cf]).inc();
691 Decision::Remove
692 } else {
693 metrics.key_kept.with_label_values(&[&cf]).inc();
694 Decision::Keep
695 }
696 }
697 Err(_) => {
698 metrics.key_error.with_label_values(&[&cf]).inc();
699 Decision::Keep
700 }
701 }
702 });
703 db_options
704}
705
706fn compaction_filter_config_by_key<T: DeserializeOwned>(
707 name: &str,
708 metrics: Arc<JsonRpcCompactionMetrics>,
709 db_options: DBOptions,
710 pruner_watermark: Arc<AtomicU64>,
711 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
712) -> DBOptions {
713 compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
714}
715
716fn coin_index_table_default_config() -> DBOptions {
717 default_db_options()
718 .optimize_for_write_throughput()
719 .optimize_for_read(
720 read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
721 )
722 .disable_write_throttling()
723}
724
725impl IndexStore {
726 pub fn new_without_init(
727 path: PathBuf,
728 registry: &Registry,
729 max_type_length: Option<u64>,
730 remove_deprecated_tables: bool,
731 ) -> Self {
732 let db_options = default_db_options().disable_write_throttling();
733 let pruner_watermark = Arc::new(AtomicU64::new(0));
734 let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
735 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
736 (
737 "transactions_from_addr".to_string(),
738 compaction_filter_config_by_key(
739 "transactions_from_addr",
740 compaction_metrics.clone(),
741 db_options.clone(),
742 pruner_watermark.clone(),
743 |(_, id): (SuiAddress, TxSequenceNumber)| id,
744 ),
745 ),
746 (
747 "transactions_to_addr".to_string(),
748 compaction_filter_config_by_key(
749 "transactions_to_addr",
750 compaction_metrics.clone(),
751 db_options.clone(),
752 pruner_watermark.clone(),
753 |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
754 ),
755 ),
756 (
757 "transactions_by_move_function".to_string(),
758 compaction_filter_config_by_key(
759 "transactions_by_move_function",
760 compaction_metrics.clone(),
761 db_options.clone(),
762 pruner_watermark.clone(),
763 |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
764 ),
765 ),
766 (
767 "transaction_order".to_string(),
768 compaction_filter_config_by_key(
769 "transaction_order",
770 compaction_metrics.clone(),
771 db_options.clone(),
772 pruner_watermark.clone(),
773 |sequence_number: TxSequenceNumber| sequence_number,
774 ),
775 ),
776 (
777 "transactions_seq".to_string(),
778 compaction_filter_config(
779 "transactions_seq",
780 compaction_metrics.clone(),
781 db_options.clone(),
782 pruner_watermark.clone(),
783 |sequence_number: TxSequenceNumber| sequence_number,
784 false,
785 ),
786 ),
787 (
788 "coin_index_2".to_string(),
789 coin_index_table_default_config(),
790 ),
791 (
792 "event_order".to_string(),
793 compaction_filter_config_by_key(
794 "event_order",
795 compaction_metrics.clone(),
796 db_options.clone(),
797 pruner_watermark.clone(),
798 |event_id: EventId| event_id.0,
799 ),
800 ),
801 (
802 "event_by_move_module".to_string(),
803 compaction_filter_config_by_key(
804 "event_by_move_module",
805 compaction_metrics.clone(),
806 db_options.clone(),
807 pruner_watermark.clone(),
808 |(_, event_id): (ModuleId, EventId)| event_id.0,
809 ),
810 ),
811 (
812 "event_by_event_module".to_string(),
813 compaction_filter_config_by_key(
814 "event_by_event_module",
815 compaction_metrics.clone(),
816 db_options.clone(),
817 pruner_watermark.clone(),
818 |(_, event_id): (ModuleId, EventId)| event_id.0,
819 ),
820 ),
821 (
822 "event_by_sender".to_string(),
823 compaction_filter_config_by_key(
824 "event_by_sender",
825 compaction_metrics.clone(),
826 db_options.clone(),
827 pruner_watermark.clone(),
828 |(_, event_id): (SuiAddress, EventId)| event_id.0,
829 ),
830 ),
831 (
832 "event_by_time".to_string(),
833 compaction_filter_config_by_key(
834 "event_by_time",
835 compaction_metrics.clone(),
836 db_options.clone(),
837 pruner_watermark.clone(),
838 |(_, event_id): (u64, EventId)| event_id.0,
839 ),
840 ),
841 ]));
842 let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
843 path,
844 MetricConf::new("index"),
845 Some(db_options.options),
846 Some(table_options),
847 remove_deprecated_tables,
848 );
849
850 let metrics = IndexStoreMetrics::new(registry);
851 let caches = IndexStoreCaches {
852 per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
853 all_balances: ShardedLruCache::new(1_000_000, 1000),
854 locks: MutexTable::new(128),
855 };
856 let next_sequence_number = tables
857 .transaction_order
858 .reversed_safe_iter_with_bounds(None, None)
859 .expect("failed to initialize indexes")
860 .next()
861 .transpose()
862 .expect("failed to initialize indexes")
863 .map(|(seq, _)| seq + 1)
864 .unwrap_or(0)
865 .into();
866 let pruner_watermark_value = tables
867 .pruner_watermark
868 .get(&())
869 .expect("failed to initialize index tables")
870 .unwrap_or(0);
871 pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
872
873 Self {
874 tables,
875 next_sequence_number,
876 caches,
877 metrics: Arc::new(metrics),
878 max_type_length: max_type_length.unwrap_or(128),
879 remove_deprecated_tables,
880 pruner_watermark,
881 }
882 }
883
884 pub fn new(
885 path: PathBuf,
886 registry: &Registry,
887 max_type_length: Option<u64>,
888 remove_deprecated_tables: bool,
889 ) -> Self {
890 let mut store =
891 Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
892 store.tables.init().unwrap();
893 store
894 }
895
896 pub fn tables(&self) -> &IndexStoreTables {
897 &self.tables
898 }
899
900 #[instrument(skip_all)]
901 pub fn index_coin(
902 &self,
903 digest: &TransactionDigest,
904 batch: &mut StagedBatch,
905 object_index_changes: &ObjectIndexChanges,
906 tx_coins: Option<TxCoins>,
907 acquire_locks: bool,
908 ) -> SuiResult<IndexStoreCacheUpdatesWithLocks> {
909 if tx_coins.is_none() {
913 return Ok(IndexStoreCacheUpdatesWithLocks {
914 _locks: None,
915 inner: IndexStoreCacheUpdates::default(),
916 });
917 }
918
919 let _locks = if acquire_locks {
920 let mut addresses: HashSet<SuiAddress> = HashSet::new();
921 addresses.extend(
922 object_index_changes
923 .deleted_owners
924 .iter()
925 .map(|(owner, _)| *owner),
926 );
927 addresses.extend(
928 object_index_changes
929 .new_owners
930 .iter()
931 .map(|((owner, _), _)| *owner),
932 );
933 Some(self.caches.locks.acquire_locks(addresses.into_iter()))
934 } else {
935 None
936 };
937
938 let (input_coins, written_coins) = tx_coins.unwrap();
939 let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
940 HashMap::new();
941
942 let coin_delete_keys = input_coins
944 .values()
945 .filter_map(|object| {
946 let Owner::AddressOwner(owner) = object.owner() else {
948 return None;
949 };
950
951 let (coin_type, coin) = object
953 .coin_type_maybe()
954 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
955
956 let key = CoinIndexKey2::new(
957 *owner,
958 coin_type.to_string(),
959 coin.balance.value(),
960 object.id(),
961 );
962
963 let map = balance_changes.entry(*owner).or_default();
964 let entry = map.entry(coin_type).or_insert(TotalBalance {
965 num_coins: 0,
966 balance: 0,
967 address_balance: 0,
968 });
969 entry.num_coins -= 1;
970 entry.balance -= coin.balance.value() as i128;
971
972 Some(key)
973 })
974 .collect::<Vec<_>>();
975 trace!(
976 tx_digset=?digest,
977 "coin_delete_keys: {:?}",
978 coin_delete_keys,
979 );
980 batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
981
982 let coin_add_keys = written_coins
984 .values()
985 .filter_map(|object| {
986 let Owner::AddressOwner(owner) = object.owner() else {
988 return None;
989 };
990
991 let (coin_type, coin) = object
993 .coin_type_maybe()
994 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
995
996 let key = CoinIndexKey2::new(
997 *owner,
998 coin_type.to_string(),
999 coin.balance.value(),
1000 object.id(),
1001 );
1002 let value = CoinInfo {
1003 version: object.version(),
1004 digest: object.digest(),
1005 balance: coin.balance.value(),
1006 previous_transaction: object.previous_transaction,
1007 };
1008 let map = balance_changes.entry(*owner).or_default();
1009 let entry = map.entry(coin_type).or_insert(TotalBalance {
1010 num_coins: 0,
1011 balance: 0,
1012 address_balance: 0,
1013 });
1014 entry.num_coins += 1;
1015 entry.balance += coin.balance.value() as i128;
1016
1017 Some((key, value))
1018 })
1019 .collect::<Vec<_>>();
1020 trace!(
1021 tx_digset=?digest,
1022 "coin_add_keys: {:?}",
1023 coin_add_keys,
1024 );
1025
1026 batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
1027
1028 let per_coin_type_balance_changes: Vec<_> = balance_changes
1029 .iter()
1030 .flat_map(|(address, balance_map)| {
1031 balance_map.iter().map(|(type_tag, balance)| {
1032 (
1033 (*address, type_tag.clone()),
1034 Ok::<TotalBalance, SuiError>(*balance),
1035 )
1036 })
1037 })
1038 .collect();
1039 let all_balance_changes: Vec<_> = balance_changes
1040 .into_iter()
1041 .map(|(address, balance_map)| {
1042 (
1043 address,
1044 Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
1045 )
1046 })
1047 .collect();
1048 let cache_updates = IndexStoreCacheUpdatesWithLocks {
1049 _locks,
1050 inner: IndexStoreCacheUpdates {
1051 per_coin_type_balance_changes,
1052 all_balance_changes,
1053 },
1054 };
1055 Ok(cache_updates)
1056 }
1057
1058 pub fn allocate_sequence_number(&self) -> u64 {
1059 self.next_sequence_number.fetch_add(1, Ordering::SeqCst)
1060 }
1061
1062 #[instrument(skip_all)]
1063 pub fn index_tx(
1064 &self,
1065 sequence: u64,
1066 sender: SuiAddress,
1067 active_inputs: impl Iterator<Item = ObjectID>,
1068 mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
1069 move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
1070 events: &TransactionEvents,
1071 object_index_changes: ObjectIndexChanges,
1072 digest: &TransactionDigest,
1073 timestamp_ms: u64,
1074 tx_coins: Option<TxCoins>,
1075 accumulator_events: Vec<AccumulatorEvent>,
1076 acquire_locks: bool,
1077 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
1078 let mut batch = StagedBatch::new();
1079
1080 batch.insert_batch(
1081 &self.tables.transaction_order,
1082 std::iter::once((sequence, *digest)),
1083 )?;
1084
1085 batch.insert_batch(
1086 &self.tables.transactions_seq,
1087 std::iter::once((*digest, sequence)),
1088 )?;
1089
1090 batch.insert_batch(
1091 &self.tables.transactions_from_addr,
1092 std::iter::once(((sender, sequence), *digest)),
1093 )?;
1094
1095 #[allow(deprecated)]
1096 if !self.remove_deprecated_tables {
1097 batch.insert_batch(
1098 &self.tables.transactions_by_input_object_id,
1099 active_inputs.map(|id| ((id, sequence), *digest)),
1100 )?;
1101
1102 batch.insert_batch(
1103 &self.tables.transactions_by_mutated_object_id,
1104 mutated_objects
1105 .clone()
1106 .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
1107 )?;
1108 }
1109
1110 batch.insert_batch(
1111 &self.tables.transactions_by_move_function,
1112 move_functions
1113 .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
1114 )?;
1115
1116 let affected_addresses = mutated_objects
1118 .filter_map(|(_, owner)| {
1119 owner
1120 .get_address_owner_address()
1121 .ok()
1122 .map(|addr| ((addr, sequence), digest))
1123 })
1124 .chain(
1125 accumulator_events
1126 .iter()
1127 .map(|event| ((event.write.address.address, sequence), digest)),
1128 );
1129 batch.insert_batch(&self.tables.transactions_to_addr, affected_addresses)?;
1130
1131 let cache_updates = self.index_coin(
1133 digest,
1134 &mut batch,
1135 &object_index_changes,
1136 tx_coins,
1137 acquire_locks,
1138 )?;
1139
1140 let address_balance_updates = accumulator_events.into_iter().filter_map(|event| {
1142 let ty = &event.write.address.ty;
1143 let coin_type = sui_types::balance::Balance::maybe_get_balance_type_param(ty)?;
1144 Some(((event.write.address.address, coin_type), ()))
1145 });
1146 batch.insert_batch(&self.tables.address_balances, address_balance_updates)?;
1147
1148 batch.delete_batch(
1150 &self.tables.owner_index,
1151 object_index_changes.deleted_owners.into_iter(),
1152 )?;
1153 batch.delete_batch(
1154 &self.tables.dynamic_field_index,
1155 object_index_changes.deleted_dynamic_fields.into_iter(),
1156 )?;
1157
1158 batch.insert_batch(
1159 &self.tables.owner_index,
1160 object_index_changes.new_owners.into_iter(),
1161 )?;
1162
1163 batch.insert_batch(
1164 &self.tables.dynamic_field_index,
1165 object_index_changes.new_dynamic_fields.into_iter(),
1166 )?;
1167
1168 let event_digest = events.digest();
1170 batch.insert_batch(
1171 &self.tables.event_order,
1172 events
1173 .data
1174 .iter()
1175 .enumerate()
1176 .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
1177 )?;
1178 batch.insert_batch(
1179 &self.tables.event_by_move_module,
1180 events
1181 .data
1182 .iter()
1183 .enumerate()
1184 .map(|(i, e)| {
1185 (
1186 i,
1187 ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
1188 )
1189 })
1190 .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
1191 )?;
1192 batch.insert_batch(
1193 &self.tables.event_by_sender,
1194 events.data.iter().enumerate().map(|(i, e)| {
1195 (
1196 (e.sender, (sequence, i)),
1197 (event_digest, *digest, timestamp_ms),
1198 )
1199 }),
1200 )?;
1201 batch.insert_batch(
1202 &self.tables.event_by_move_event,
1203 events.data.iter().enumerate().map(|(i, e)| {
1204 (
1205 (e.type_.clone(), (sequence, i)),
1206 (event_digest, *digest, timestamp_ms),
1207 )
1208 }),
1209 )?;
1210
1211 batch.insert_batch(
1212 &self.tables.event_by_time,
1213 events.data.iter().enumerate().map(|(i, _)| {
1214 (
1215 (timestamp_ms, (sequence, i)),
1216 (event_digest, *digest, timestamp_ms),
1217 )
1218 }),
1219 )?;
1220
1221 batch.insert_batch(
1222 &self.tables.event_by_event_module,
1223 events.data.iter().enumerate().map(|(i, e)| {
1224 (
1225 (
1226 ModuleId::new(e.type_.address, e.type_.module.clone()),
1227 (sequence, i),
1228 ),
1229 (event_digest, *digest, timestamp_ms),
1230 )
1231 }),
1232 )?;
1233
1234 Ok((batch, cache_updates))
1235 }
1236
1237 pub fn commit_index_batch(
1239 &self,
1240 batch: DBBatch,
1241 cache_updates: Vec<IndexStoreCacheUpdates>,
1242 ) -> SuiResult {
1243 let invalidate_caches =
1244 read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
1245
1246 if invalidate_caches {
1247 for cu in &cache_updates {
1248 self.invalidate_per_coin_type_cache(
1249 cu.per_coin_type_balance_changes.iter().map(|x| x.0.clone()),
1250 )?;
1251 self.invalidate_all_balance_cache(cu.all_balance_changes.iter().map(|x| x.0))?;
1252 }
1253 }
1254
1255 batch.write()?;
1256
1257 if !invalidate_caches {
1258 for cu in cache_updates {
1259 self.update_per_coin_type_cache(cu.per_coin_type_balance_changes)?;
1260 self.update_all_balance_cache(cu.all_balance_changes)?;
1261 }
1262 }
1263 Ok(())
1264 }
1265
1266 pub fn new_db_batch(&self) -> DBBatch {
1267 self.tables.transactions_from_addr.batch()
1268 }
1269
1270 pub fn next_sequence_number(&self) -> TxSequenceNumber {
1271 self.next_sequence_number.load(Ordering::SeqCst) + 1
1272 }
1273
1274 #[instrument(skip(self))]
1275 pub fn get_transactions(
1276 &self,
1277 filter: Option<TransactionFilter>,
1278 cursor: Option<TransactionDigest>,
1279 limit: Option<usize>,
1280 reverse: bool,
1281 ) -> SuiResult<Vec<TransactionDigest>> {
1282 let cursor = if let Some(cursor) = cursor {
1284 Some(
1285 self.get_transaction_seq(&cursor)?
1286 .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
1287 )
1288 } else {
1289 None
1290 };
1291 match filter {
1292 Some(TransactionFilter::MoveFunction {
1293 package,
1294 module,
1295 function,
1296 }) => Ok(self.get_transactions_by_move_function(
1297 package, module, function, cursor, limit, reverse,
1298 )?),
1299 Some(TransactionFilter::InputObject(object_id)) => {
1300 Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
1301 }
1302 Some(TransactionFilter::ChangedObject(object_id)) => {
1303 Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
1304 }
1305 Some(TransactionFilter::FromAddress(address)) => {
1306 Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
1307 }
1308 Some(TransactionFilter::ToAddress(address)) => {
1309 Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
1310 }
1311 Some(_) => Err(SuiErrorKind::UserInputError {
1314 error: UserInputError::Unsupported(format!("{:?}", filter)),
1315 }
1316 .into()),
1317 None => {
1318 if reverse {
1319 let iter = self
1320 .tables
1321 .transaction_order
1322 .reversed_safe_iter_with_bounds(
1323 None,
1324 Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
1325 )?
1326 .skip(usize::from(cursor.is_some()))
1327 .map(|result| result.map(|(_, digest)| digest));
1328 if let Some(limit) = limit {
1329 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1330 } else {
1331 Ok(iter.collect::<Result<Vec<_>, _>>()?)
1332 }
1333 } else {
1334 let iter = self
1335 .tables
1336 .transaction_order
1337 .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
1338 .skip(usize::from(cursor.is_some()))
1339 .map(|result| result.map(|(_, digest)| digest));
1340 if let Some(limit) = limit {
1341 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1342 } else {
1343 Ok(iter.collect::<Result<Vec<_>, _>>()?)
1344 }
1345 }
1346 }
1347 }
1348 }
1349
1350 #[instrument(skip_all)]
1351 fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
1352 index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
1353 key: KeyT,
1354 cursor: Option<TxSequenceNumber>,
1355 limit: Option<usize>,
1356 reverse: bool,
1357 ) -> SuiResult<Vec<TransactionDigest>> {
1358 Ok(if reverse {
1359 let iter = index
1360 .reversed_safe_iter_with_bounds(
1361 None,
1362 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
1363 )?
1364 .skip(usize::from(cursor.is_some()))
1366 .take_while(|result| {
1367 result
1368 .as_ref()
1369 .map(|((id, _), _)| *id == key)
1370 .unwrap_or(false)
1371 })
1372 .map(|result| result.map(|(_, digest)| digest));
1373 if let Some(limit) = limit {
1374 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1375 } else {
1376 iter.collect::<Result<Vec<_>, _>>()?
1377 }
1378 } else {
1379 let iter = index
1380 .safe_iter_with_bounds(
1381 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1382 None,
1383 )
1384 .skip(usize::from(cursor.is_some()))
1386 .map(|result| result.expect("iterator db error"))
1387 .take_while(|((id, _), _)| *id == key)
1388 .map(|(_, digest)| digest);
1389 if let Some(limit) = limit {
1390 iter.take(limit).collect()
1391 } else {
1392 iter.collect()
1393 }
1394 })
1395 }
1396
1397 #[instrument(skip(self))]
1398 pub fn get_transactions_by_input_object(
1399 &self,
1400 input_object: ObjectID,
1401 cursor: Option<TxSequenceNumber>,
1402 limit: Option<usize>,
1403 reverse: bool,
1404 ) -> SuiResult<Vec<TransactionDigest>> {
1405 if self.remove_deprecated_tables {
1406 return Ok(vec![]);
1407 }
1408 #[allow(deprecated)]
1409 Self::get_transactions_from_index(
1410 &self.tables.transactions_by_input_object_id,
1411 input_object,
1412 cursor,
1413 limit,
1414 reverse,
1415 )
1416 }
1417
1418 #[instrument(skip(self))]
1419 pub fn get_transactions_by_mutated_object(
1420 &self,
1421 mutated_object: ObjectID,
1422 cursor: Option<TxSequenceNumber>,
1423 limit: Option<usize>,
1424 reverse: bool,
1425 ) -> SuiResult<Vec<TransactionDigest>> {
1426 if self.remove_deprecated_tables {
1427 return Ok(vec![]);
1428 }
1429 #[allow(deprecated)]
1430 Self::get_transactions_from_index(
1431 &self.tables.transactions_by_mutated_object_id,
1432 mutated_object,
1433 cursor,
1434 limit,
1435 reverse,
1436 )
1437 }
1438
1439 #[instrument(skip(self))]
1440 pub fn get_transactions_from_addr(
1441 &self,
1442 addr: SuiAddress,
1443 cursor: Option<TxSequenceNumber>,
1444 limit: Option<usize>,
1445 reverse: bool,
1446 ) -> SuiResult<Vec<TransactionDigest>> {
1447 Self::get_transactions_from_index(
1448 &self.tables.transactions_from_addr,
1449 addr,
1450 cursor,
1451 limit,
1452 reverse,
1453 )
1454 }
1455
1456 #[instrument(skip(self))]
1457 pub fn get_transactions_by_move_function(
1458 &self,
1459 package: ObjectID,
1460 module: Option<String>,
1461 function: Option<String>,
1462 cursor: Option<TxSequenceNumber>,
1463 limit: Option<usize>,
1464 reverse: bool,
1465 ) -> SuiResult<Vec<TransactionDigest>> {
1466 if function.is_some() && module.is_none() {
1468 return Err(SuiErrorKind::UserInputError {
1469 error: UserInputError::MoveFunctionInputError(
1470 "Cannot supply function without supplying module".to_string(),
1471 ),
1472 }
1473 .into());
1474 }
1475
1476 if cursor.is_some() && (module.is_none() || function.is_none()) {
1478 return Err(SuiErrorKind::UserInputError {
1479 error: UserInputError::MoveFunctionInputError(
1480 "Cannot supply cursor without supplying module and function".to_string(),
1481 ),
1482 }
1483 .into());
1484 }
1485
1486 let cursor_val = cursor.unwrap_or(if reverse {
1487 TxSequenceNumber::MAX
1488 } else {
1489 TxSequenceNumber::MIN
1490 });
1491
1492 let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1493 let module_val = module.clone().unwrap_or(if reverse {
1494 max_string.clone()
1495 } else {
1496 "".to_string()
1497 });
1498
1499 let function_val =
1500 function
1501 .clone()
1502 .unwrap_or(if reverse { max_string } else { "".to_string() });
1503
1504 let key = (package, module_val, function_val, cursor_val);
1505 Ok(if reverse {
1506 let iter = self
1507 .tables
1508 .transactions_by_move_function
1509 .reversed_safe_iter_with_bounds(None, Some(key))?
1510 .skip(usize::from(cursor.is_some()))
1512 .take_while(|result| {
1513 result
1514 .as_ref()
1515 .map(|((id, m, f, _), _)| {
1516 *id == package
1517 && module.as_ref().map(|x| x == m).unwrap_or(true)
1518 && function.as_ref().map(|x| x == f).unwrap_or(true)
1519 })
1520 .unwrap_or(false)
1521 })
1522 .map(|result| result.map(|(_, digest)| digest));
1523 if let Some(limit) = limit {
1524 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1525 } else {
1526 iter.collect::<Result<Vec<_>, _>>()?
1527 }
1528 } else {
1529 let iter = self
1530 .tables
1531 .transactions_by_move_function
1532 .safe_iter_with_bounds(Some(key), None)
1533 .map(|result| result.expect("iterator db error"))
1534 .skip(usize::from(cursor.is_some()))
1536 .take_while(|((id, m, f, _), _)| {
1537 *id == package
1538 && module.as_ref().map(|x| x == m).unwrap_or(true)
1539 && function.as_ref().map(|x| x == f).unwrap_or(true)
1540 })
1541 .map(|(_, digest)| digest);
1542 if let Some(limit) = limit {
1543 iter.take(limit).collect()
1544 } else {
1545 iter.collect()
1546 }
1547 })
1548 }
1549
1550 #[instrument(skip(self))]
1551 pub fn get_transactions_to_addr(
1552 &self,
1553 addr: SuiAddress,
1554 cursor: Option<TxSequenceNumber>,
1555 limit: Option<usize>,
1556 reverse: bool,
1557 ) -> SuiResult<Vec<TransactionDigest>> {
1558 Self::get_transactions_from_index(
1559 &self.tables.transactions_to_addr,
1560 addr,
1561 cursor,
1562 limit,
1563 reverse,
1564 )
1565 }
1566
1567 #[instrument(skip(self))]
1568 pub fn get_transaction_seq(
1569 &self,
1570 digest: &TransactionDigest,
1571 ) -> SuiResult<Option<TxSequenceNumber>> {
1572 Ok(self.tables.transactions_seq.get(digest)?)
1573 }
1574
1575 #[instrument(skip(self))]
1576 pub fn all_events(
1577 &self,
1578 tx_seq: TxSequenceNumber,
1579 event_seq: usize,
1580 limit: usize,
1581 descending: bool,
1582 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1583 Ok(if descending {
1584 self.tables
1585 .event_order
1586 .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1587 .take(limit)
1588 .map(|result| {
1589 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1590 (digest, tx_digest, event_seq, time)
1591 })
1592 })
1593 .collect::<Result<Vec<_>, _>>()?
1594 } else {
1595 self.tables
1596 .event_order
1597 .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1598 .take(limit)
1599 .map(|result| {
1600 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1601 (digest, tx_digest, event_seq, time)
1602 })
1603 })
1604 .collect::<Result<Vec<_>, _>>()?
1605 })
1606 }
1607
1608 #[instrument(skip(self))]
1609 pub fn events_by_transaction(
1610 &self,
1611 digest: &TransactionDigest,
1612 tx_seq: TxSequenceNumber,
1613 event_seq: usize,
1614 limit: usize,
1615 descending: bool,
1616 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1617 let seq = self
1618 .get_transaction_seq(digest)?
1619 .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1620 Ok(if descending {
1621 self.tables
1622 .event_order
1623 .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1624 .take_while(|result| {
1625 result
1626 .as_ref()
1627 .map(|((tx, _), _)| tx == &seq)
1628 .unwrap_or(false)
1629 })
1630 .take(limit)
1631 .map(|result| {
1632 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1633 (digest, tx_digest, event_seq, time)
1634 })
1635 })
1636 .collect::<Result<Vec<_>, _>>()?
1637 } else {
1638 self.tables
1639 .event_order
1640 .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1641 .map(|result| result.expect("iterator db error"))
1642 .take_while(|((tx, _), _)| tx == &seq)
1643 .take(limit)
1644 .map(|((_, event_seq), (digest, tx_digest, time))| {
1645 (digest, tx_digest, event_seq, time)
1646 })
1647 .collect()
1648 })
1649 }
1650
1651 #[instrument(skip_all)]
1652 fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1653 index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1654 key: &KeyT,
1655 tx_seq: TxSequenceNumber,
1656 event_seq: usize,
1657 limit: usize,
1658 descending: bool,
1659 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1660 Ok(if descending {
1661 index
1662 .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1663 .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1664 .take(limit)
1665 .map(|result| {
1666 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1667 (digest, tx_digest, event_seq, time)
1668 })
1669 })
1670 .collect::<Result<Vec<_>, _>>()?
1671 } else {
1672 index
1673 .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1674 .map(|result| result.expect("iterator db error"))
1675 .take_while(|((m, _), _)| m == key)
1676 .take(limit)
1677 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1678 (digest, tx_digest, event_seq, time)
1679 })
1680 .collect()
1681 })
1682 }
1683
1684 #[instrument(skip(self))]
1685 pub fn events_by_module_id(
1686 &self,
1687 module: &ModuleId,
1688 tx_seq: TxSequenceNumber,
1689 event_seq: usize,
1690 limit: usize,
1691 descending: bool,
1692 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1693 Self::get_event_from_index(
1694 &self.tables.event_by_move_module,
1695 module,
1696 tx_seq,
1697 event_seq,
1698 limit,
1699 descending,
1700 )
1701 }
1702
1703 #[instrument(skip(self))]
1704 pub fn events_by_move_event_struct_name(
1705 &self,
1706 struct_name: &StructTag,
1707 tx_seq: TxSequenceNumber,
1708 event_seq: usize,
1709 limit: usize,
1710 descending: bool,
1711 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1712 Self::get_event_from_index(
1713 &self.tables.event_by_move_event,
1714 struct_name,
1715 tx_seq,
1716 event_seq,
1717 limit,
1718 descending,
1719 )
1720 }
1721
1722 #[instrument(skip(self))]
1723 pub fn events_by_move_event_module(
1724 &self,
1725 module_id: &ModuleId,
1726 tx_seq: TxSequenceNumber,
1727 event_seq: usize,
1728 limit: usize,
1729 descending: bool,
1730 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1731 Self::get_event_from_index(
1732 &self.tables.event_by_event_module,
1733 module_id,
1734 tx_seq,
1735 event_seq,
1736 limit,
1737 descending,
1738 )
1739 }
1740
1741 #[instrument(skip(self))]
1742 pub fn events_by_sender(
1743 &self,
1744 sender: &SuiAddress,
1745 tx_seq: TxSequenceNumber,
1746 event_seq: usize,
1747 limit: usize,
1748 descending: bool,
1749 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1750 Self::get_event_from_index(
1751 &self.tables.event_by_sender,
1752 sender,
1753 tx_seq,
1754 event_seq,
1755 limit,
1756 descending,
1757 )
1758 }
1759
1760 #[instrument(skip(self))]
1761 pub fn event_iterator(
1762 &self,
1763 start_time: u64,
1764 end_time: u64,
1765 tx_seq: TxSequenceNumber,
1766 event_seq: usize,
1767 limit: usize,
1768 descending: bool,
1769 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1770 Ok(if descending {
1771 self.tables
1772 .event_by_time
1773 .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1774 .take_while(|result| {
1775 result
1776 .as_ref()
1777 .map(|((m, _), _)| m >= &start_time)
1778 .unwrap_or(false)
1779 })
1780 .take(limit)
1781 .map(|result| {
1782 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1783 (digest, tx_digest, event_seq, time)
1784 })
1785 })
1786 .collect::<Result<Vec<_>, _>>()?
1787 } else {
1788 self.tables
1789 .event_by_time
1790 .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1791 .map(|result| result.expect("iterator db error"))
1792 .take_while(|((m, _), _)| m <= &end_time)
1793 .take(limit)
1794 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1795 (digest, tx_digest, event_seq, time)
1796 })
1797 .collect()
1798 })
1799 }
1800
1801 pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1802 match self
1803 .tables
1804 .event_by_time
1805 .reversed_safe_iter_with_bounds(
1806 None,
1807 Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1808 )?
1809 .next()
1810 .transpose()?
1811 {
1812 Some(((_, (watermark, _)), _)) => {
1813 if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1814 info!(
1815 "json rpc index pruning. Watermark is {} with digest {}",
1816 watermark, digest
1817 );
1818 }
1819 self.pruner_watermark.store(watermark, Ordering::Relaxed);
1820 self.tables.pruner_watermark.insert(&(), &watermark)?;
1821 Ok(watermark)
1822 }
1823 None => Ok(0),
1824 }
1825 }
1826
1827 pub fn get_dynamic_fields_iterator(
1828 &self,
1829 object: ObjectID,
1830 cursor: Option<ObjectID>,
1831 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1832 {
1833 Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1834 }
1835
1836 #[instrument(skip(self))]
1837 pub fn get_dynamic_field_object_id(
1838 &self,
1839 object: ObjectID,
1840 name_type: TypeTag,
1841 name_bcs_bytes: &[u8],
1842 ) -> SuiResult<Option<ObjectID>> {
1843 debug!(?object, "get_dynamic_field_object_id");
1844 let dynamic_field_id =
1845 dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1846 |e| {
1847 SuiErrorKind::Unknown(format!(
1848 "Unable to generate dynamic field id. Got error: {e:?}"
1849 ))
1850 },
1851 )?;
1852
1853 if let Some(info) = self
1854 .tables
1855 .dynamic_field_index
1856 .get(&(object, dynamic_field_id))?
1857 {
1858 debug_assert!(
1860 info.object_id == dynamic_field_id
1861 || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1862 );
1863 return Ok(Some(info.object_id));
1864 }
1865
1866 let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1867 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1868 let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1869 object,
1870 &dynamic_object_field_type,
1871 name_bcs_bytes,
1872 )
1873 .map_err(|e| {
1874 SuiErrorKind::Unknown(format!(
1875 "Unable to generate dynamic field id. Got error: {e:?}"
1876 ))
1877 })?;
1878 if let Some(info) = self
1879 .tables
1880 .dynamic_field_index
1881 .get(&(object, dynamic_object_field_id))?
1882 {
1883 return Ok(Some(info.object_id));
1884 }
1885
1886 Ok(None)
1887 }
1888
1889 #[instrument(skip(self))]
1890 pub fn get_owner_objects(
1891 &self,
1892 owner: SuiAddress,
1893 cursor: Option<ObjectID>,
1894 limit: usize,
1895 filter: Option<SuiObjectDataFilter>,
1896 ) -> SuiResult<Vec<ObjectInfo>> {
1897 let cursor = match cursor {
1898 Some(cursor) => cursor,
1899 None => ObjectID::ZERO,
1900 };
1901 Ok(self
1902 .get_owner_objects_iterator(owner, cursor, filter)?
1903 .take(limit)
1904 .collect())
1905 }
1906
1907 pub fn get_address_balance_coin_types_iter(
1908 &self,
1909 owner: SuiAddress,
1910 ) -> impl Iterator<Item = TypeTag> {
1911 let start_key = (owner, TypeTag::Bool);
1912 self.tables()
1913 .address_balances
1914 .safe_iter_with_bounds(Some(start_key), None)
1915 .map(|result| result.expect("iterator db error"))
1916 .take_while(move |(key, _)| key.0 == owner)
1917 .map(|(key, _)| key.1)
1918 }
1919
1920 pub fn get_owned_coins_iterator(
1921 coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1922 owner: SuiAddress,
1923 coin_type_tag: Option<String>,
1924 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1925 let all_coins = coin_type_tag.is_none();
1926 let starting_coin_type =
1927 coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1928 let start_key =
1929 CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1930 Ok(coin_index
1931 .safe_iter_with_bounds(Some(start_key), None)
1932 .map(|result| result.expect("iterator db error"))
1933 .take_while(move |(key, _)| {
1934 if key.owner != owner {
1935 return false;
1936 }
1937 if !all_coins && starting_coin_type != key.coin_type {
1938 return false;
1939 }
1940 true
1941 }))
1942 }
1943
1944 pub fn get_owned_coins_iterator_with_cursor(
1945 &self,
1946 owner: SuiAddress,
1947 cursor: (String, u64, ObjectID),
1948 limit: usize,
1949 one_coin_type_only: bool,
1950 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1951 let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1952 let start_key = CoinIndexKey2::new_from_cursor(
1953 owner,
1954 starting_coin_type.clone(),
1955 inverted_balance,
1956 starting_object_id,
1957 );
1958 Ok(self
1959 .tables
1960 .coin_index_2
1961 .safe_iter_with_bounds(Some(start_key), None)
1962 .map(|result| result.expect("iterator db error"))
1963 .filter(move |(key, _)| key.object_id != starting_object_id)
1964 .enumerate()
1965 .take_while(move |(index, (key, _))| {
1966 if *index >= limit {
1967 return false;
1968 }
1969 if key.owner != owner {
1970 return false;
1971 }
1972 if one_coin_type_only && starting_coin_type != key.coin_type {
1973 return false;
1974 }
1975 true
1976 })
1977 .map(|(_index, (key, info))| (key, info)))
1978 }
1979
1980 pub fn get_owner_objects_iterator(
1983 &self,
1984 owner: SuiAddress,
1985 starting_object_id: ObjectID,
1986 filter: Option<SuiObjectDataFilter>,
1987 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1988 Ok(self
1989 .tables
1990 .owner_index
1991 .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1993 .map(|result| result.expect("iterator db error"))
1994 .skip(usize::from(starting_object_id != ObjectID::ZERO))
1995 .take_while(move |((address_owner, _), _)| address_owner == &owner)
1996 .filter(move |(_, o)| {
1997 if let Some(filter) = filter.as_ref() {
1998 filter.matches(o)
1999 } else {
2000 true
2001 }
2002 })
2003 .map(|(_, object_info)| object_info))
2004 }
2005
2006 pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
2007 let mut batch = self.tables.owner_index.batch();
2008 batch.insert_batch(
2009 &self.tables.owner_index,
2010 object_index_changes.new_owners.into_iter(),
2011 )?;
2012 batch.insert_batch(
2013 &self.tables.dynamic_field_index,
2014 object_index_changes.new_dynamic_fields.into_iter(),
2015 )?;
2016 batch.write()?;
2017 Ok(())
2018 }
2019
2020 pub fn is_empty(&self) -> bool {
2021 self.tables.owner_index.is_empty()
2022 }
2023
2024 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
2025 self.tables
2027 .transactions_from_addr
2028 .checkpoint_db(path)
2029 .map_err(Into::into)
2030 }
2031
2032 #[instrument(skip(self))]
2037 pub fn get_coin_object_balance(
2038 &self,
2039 owner: SuiAddress,
2040 coin_type: TypeTag,
2041 ) -> SuiResult<TotalBalance> {
2042 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2043 let cloned_coin_type = coin_type.clone();
2044 let metrics_cloned = self.metrics.clone();
2045 let coin_index_cloned = self.tables.coin_index_2.clone();
2046 if force_disable_cache {
2047 return Self::get_balance_from_db(
2048 metrics_cloned,
2049 coin_index_cloned,
2050 owner,
2051 cloned_coin_type,
2052 )
2053 .map_err(|e| {
2054 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2055 .into()
2056 });
2057 }
2058
2059 self.metrics.balance_lookup_from_total.inc();
2060
2061 let balance = self
2062 .caches
2063 .per_coin_type_balance
2064 .get(&(owner, coin_type.clone()));
2065 if let Some(balance) = balance {
2066 return balance;
2067 }
2068 let all_balance = self.caches.all_balances.get(&owner.clone());
2070 if let Some(Ok(all_balance)) = all_balance
2071 && let Some(balance) = all_balance.get(&coin_type)
2072 {
2073 return Ok(*balance);
2074 }
2075 let cloned_coin_type = coin_type.clone();
2076 let metrics_cloned = self.metrics.clone();
2077 let coin_index_cloned = self.tables.coin_index_2.clone();
2078 self.caches
2079 .per_coin_type_balance
2080 .get_with((owner, coin_type), move || {
2081 Self::get_balance_from_db(
2082 metrics_cloned,
2083 coin_index_cloned,
2084 owner,
2085 cloned_coin_type,
2086 )
2087 .map_err(|e| {
2088 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2089 .into()
2090 })
2091 })
2092 }
2093
2094 #[instrument(skip(self))]
2100 pub fn get_all_coin_object_balances(
2101 &self,
2102 owner: SuiAddress,
2103 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2104 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2105 let metrics_cloned = self.metrics.clone();
2106 let coin_index_cloned = self.tables.coin_index_2.clone();
2107 if force_disable_cache {
2108 return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
2109 .map_err(|e| {
2110 SuiErrorKind::ExecutionError(format!(
2111 "Failed to read all balance from DB: {:?}",
2112 e
2113 ))
2114 .into()
2115 });
2116 }
2117
2118 self.metrics.all_balance_lookup_from_total.inc();
2119 let metrics_cloned = self.metrics.clone();
2120 let coin_index_cloned = self.tables.coin_index_2.clone();
2121 self.caches.all_balances.get_with(owner, move || {
2122 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
2123 SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
2124 .into()
2125 })
2126 })
2127 }
2128
2129 #[instrument(skip_all)]
2131 pub fn get_balance_from_db(
2132 metrics: Arc<IndexStoreMetrics>,
2133 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2134 owner: SuiAddress,
2135 coin_type: TypeTag,
2136 ) -> SuiResult<TotalBalance> {
2137 metrics.balance_lookup_from_db.inc();
2138 let coin_type_str = coin_type.to_string();
2139 let coins =
2140 Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
2141
2142 let mut balance = 0i128;
2143 let mut num_coins = 0;
2144 for (_key, coin_info) in coins {
2145 balance += coin_info.balance as i128;
2146 num_coins += 1;
2147 }
2148 Ok(TotalBalance {
2149 balance,
2150 num_coins,
2151 address_balance: 0,
2152 })
2153 }
2154
2155 #[instrument(skip_all)]
2157 pub fn get_all_balances_from_db(
2158 metrics: Arc<IndexStoreMetrics>,
2159 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2160 owner: SuiAddress,
2161 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2162 metrics.all_balance_lookup_from_db.inc();
2163 let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
2164 let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
2165 .chunk_by(|(key, _coin)| key.coin_type.clone());
2166 for (coin_type, coins) in &coins {
2167 let mut total_balance = 0i128;
2168 let mut coin_object_count = 0;
2169 for (_, coin_info) in coins {
2170 total_balance += coin_info.balance as i128;
2171 coin_object_count += 1;
2172 }
2173 let coin_type =
2174 TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
2175 SuiErrorKind::ExecutionError(format!(
2176 "Failed to parse event sender address: {:?}",
2177 e
2178 ))
2179 })?));
2180 balances.insert(
2181 coin_type,
2182 TotalBalance {
2183 num_coins: coin_object_count,
2184 balance: total_balance,
2185 address_balance: 0,
2186 },
2187 );
2188 }
2189 Ok(Arc::new(balances))
2190 }
2191
2192 fn invalidate_per_coin_type_cache(
2193 &self,
2194 keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
2195 ) -> SuiResult {
2196 self.caches.per_coin_type_balance.batch_invalidate(keys);
2197 Ok(())
2198 }
2199
2200 fn invalidate_all_balance_cache(
2201 &self,
2202 addresses: impl IntoIterator<Item = SuiAddress>,
2203 ) -> SuiResult {
2204 self.caches.all_balances.batch_invalidate(addresses);
2205 Ok(())
2206 }
2207
2208 fn update_per_coin_type_cache(
2209 &self,
2210 keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
2211 ) -> SuiResult {
2212 self.caches
2213 .per_coin_type_balance
2214 .batch_merge(keys, Self::merge_balance);
2215 Ok(())
2216 }
2217
2218 fn merge_balance(
2219 old_balance: &SuiResult<TotalBalance>,
2220 balance_delta: &SuiResult<TotalBalance>,
2221 ) -> SuiResult<TotalBalance> {
2222 if let Ok(old_balance) = old_balance {
2223 if let Ok(balance_delta) = balance_delta {
2224 Ok(TotalBalance {
2225 balance: old_balance.balance + balance_delta.balance,
2226 num_coins: old_balance.num_coins + balance_delta.num_coins,
2227 address_balance: old_balance.address_balance,
2228 })
2229 } else {
2230 balance_delta.clone()
2231 }
2232 } else {
2233 old_balance.clone()
2234 }
2235 }
2236
2237 fn update_all_balance_cache(
2238 &self,
2239 keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
2240 ) -> SuiResult {
2241 self.caches
2242 .all_balances
2243 .batch_merge(keys, Self::merge_all_balance);
2244 Ok(())
2245 }
2246
2247 fn merge_all_balance(
2248 old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2249 balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2250 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2251 if let Ok(old_balance) = old_balance {
2252 if let Ok(balance_delta) = balance_delta {
2253 let mut new_balance = HashMap::new();
2254 for (key, value) in old_balance.iter() {
2255 new_balance.insert(key.clone(), *value);
2256 }
2257 for (key, delta) in balance_delta.iter() {
2258 let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
2259 balance: 0,
2260 num_coins: 0,
2261 address_balance: 0,
2262 });
2263 let new_total = TotalBalance {
2264 balance: old.balance + delta.balance,
2265 num_coins: old.num_coins + delta.num_coins,
2266 address_balance: old.address_balance,
2267 };
2268 new_balance.insert(key.clone(), new_total);
2269 }
2270 Ok(Arc::new(new_balance))
2271 } else {
2272 balance_delta.clone()
2273 }
2274 } else {
2275 old_balance.clone()
2276 }
2277 }
2278}
2279
2280#[cfg(test)]
2281mod tests {
2282 use super::IndexStore;
2283 use super::ObjectIndexChanges;
2284 use move_core_types::account_address::AccountAddress;
2285 use prometheus::Registry;
2286 use std::collections::BTreeMap;
2287 use std::env::temp_dir;
2288 use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
2289 use sui_types::digests::TransactionDigest;
2290 use sui_types::effects::TransactionEvents;
2291 use sui_types::gas_coin::GAS;
2292 use sui_types::object;
2293 use sui_types::object::Owner;
2294
2295 #[tokio::test]
2296 async fn test_index_cache() -> anyhow::Result<()> {
2297 let index_store =
2305 IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
2306 let address: SuiAddress = AccountAddress::random().into();
2307 let mut written_objects = BTreeMap::new();
2308 let mut input_objects = BTreeMap::new();
2309 let mut object_map = BTreeMap::new();
2310
2311 let mut new_objects = vec![];
2312 for _i in 0..10 {
2313 let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
2314 new_objects.push((
2315 (address, object.id()),
2316 ObjectInfo {
2317 object_id: object.id(),
2318 version: object.version(),
2319 digest: object.digest(),
2320 type_: ObjectType::Struct(object.type_().unwrap().clone()),
2321 owner: Owner::AddressOwner(address),
2322 previous_transaction: object.previous_transaction,
2323 },
2324 ));
2325 object_map.insert(object.id(), object.clone());
2326 written_objects.insert(object.data.id(), object);
2327 }
2328 let object_index_changes = ObjectIndexChanges {
2329 deleted_owners: vec![],
2330 deleted_dynamic_fields: vec![],
2331 new_owners: new_objects,
2332 new_dynamic_fields: vec![],
2333 };
2334
2335 let tx_coins = (input_objects.clone(), written_objects.clone());
2336 let seq = index_store.allocate_sequence_number();
2337 let (raw_batch, cache_updates) = index_store.index_tx(
2338 seq,
2339 address,
2340 vec![].into_iter(),
2341 vec![].into_iter(),
2342 vec![].into_iter(),
2343 &TransactionEvents { data: vec![] },
2344 object_index_changes,
2345 &TransactionDigest::random(),
2346 1234,
2347 Some(tx_coins),
2348 vec![],
2349 false,
2350 )?;
2351 let mut db_batch = index_store.new_db_batch();
2353 db_batch.concat(vec![raw_batch]).unwrap();
2354 index_store
2355 .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2356 .unwrap();
2357
2358 let balance_from_db = IndexStore::get_balance_from_db(
2359 index_store.metrics.clone(),
2360 index_store.tables.coin_index_2.clone(),
2361 address,
2362 GAS::type_tag(),
2363 )?;
2364 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2365 assert_eq!(balance, balance_from_db);
2366 assert_eq!(balance.balance, 1000);
2367 assert_eq!(balance.num_coins, 10);
2368
2369 let all_balance = index_store.get_all_coin_object_balances(address)?;
2370 let balance = all_balance.get(&GAS::type_tag()).unwrap();
2371 assert_eq!(*balance, balance_from_db);
2372 assert_eq!(balance.balance, 1000);
2373 assert_eq!(balance.num_coins, 10);
2374
2375 written_objects.clear();
2376 let mut deleted_objects = vec![];
2377 for (id, object) in object_map.iter().take(3) {
2378 deleted_objects.push((address, *id));
2379 input_objects.insert(*id, object.to_owned());
2380 }
2381 let object_index_changes = ObjectIndexChanges {
2382 deleted_owners: deleted_objects.clone(),
2383 deleted_dynamic_fields: vec![],
2384 new_owners: vec![],
2385 new_dynamic_fields: vec![],
2386 };
2387 let tx_coins = (input_objects, written_objects);
2388 let seq = index_store.allocate_sequence_number();
2389 let (raw_batch, cache_updates) = index_store.index_tx(
2390 seq,
2391 address,
2392 vec![].into_iter(),
2393 vec![].into_iter(),
2394 vec![].into_iter(),
2395 &TransactionEvents { data: vec![] },
2396 object_index_changes,
2397 &TransactionDigest::random(),
2398 1234,
2399 Some(tx_coins),
2400 vec![],
2401 false,
2402 )?;
2403 let mut db_batch = index_store.new_db_batch();
2404 db_batch.concat(vec![raw_batch]).unwrap();
2405 index_store
2406 .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2407 .unwrap();
2408 let balance_from_db = IndexStore::get_balance_from_db(
2409 index_store.metrics.clone(),
2410 index_store.tables.coin_index_2.clone(),
2411 address,
2412 GAS::type_tag(),
2413 )?;
2414 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2415 assert_eq!(balance, balance_from_db);
2416 assert_eq!(balance.balance, 700);
2417 assert_eq!(balance.num_coins, 7);
2418 index_store
2421 .caches
2422 .per_coin_type_balance
2423 .invalidate(&(address, GAS::type_tag()));
2424 let all_balance = index_store.get_all_coin_object_balances(address)?;
2425 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2426 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2427 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2428 assert_eq!(balance, balance_from_db);
2429 assert_eq!(balance.balance, 700);
2430 assert_eq!(balance.num_coins, 7);
2431
2432 Ok(())
2433 }
2434
2435 #[tokio::test]
2436 async fn test_get_transaction_by_move_function() {
2437 use sui_types::base_types::ObjectID;
2438 use typed_store::Map;
2439
2440 let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2441 let db = &index_store.tables.transactions_by_move_function;
2442 db.insert(
2443 &(
2444 ObjectID::new([1; 32]),
2445 "mod".to_string(),
2446 "f".to_string(),
2447 0,
2448 ),
2449 &[0; 32].into(),
2450 )
2451 .unwrap();
2452 db.insert(
2453 &(
2454 ObjectID::new([1; 32]),
2455 "mod".to_string(),
2456 "Z".repeat(128),
2457 0,
2458 ),
2459 &[1; 32].into(),
2460 )
2461 .unwrap();
2462 db.insert(
2463 &(
2464 ObjectID::new([1; 32]),
2465 "mod".to_string(),
2466 "f".repeat(128),
2467 0,
2468 ),
2469 &[2; 32].into(),
2470 )
2471 .unwrap();
2472 db.insert(
2473 &(
2474 ObjectID::new([1; 32]),
2475 "mod".to_string(),
2476 "z".repeat(128),
2477 0,
2478 ),
2479 &[3; 32].into(),
2480 )
2481 .unwrap();
2482
2483 let mut v = index_store
2484 .get_transactions_by_move_function(
2485 ObjectID::new([1; 32]),
2486 Some("mod".to_string()),
2487 None,
2488 None,
2489 None,
2490 false,
2491 )
2492 .unwrap();
2493 let v_rev = index_store
2494 .get_transactions_by_move_function(
2495 ObjectID::new([1; 32]),
2496 Some("mod".to_string()),
2497 None,
2498 None,
2499 None,
2500 true,
2501 )
2502 .unwrap();
2503 v.reverse();
2504 assert_eq!(v, v_rev);
2505 }
2506}