1use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19 register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use sui_types::accumulator_event::AccumulatorEvent;
23use typed_store::TypedStoreError;
24use typed_store::rocksdb::compaction_filter::Decision;
25
26use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
27use sui_storage::mutex_table::MutexTable;
28use sui_storage::sharded_lru::ShardedLruCache;
29use sui_types::base_types::{
30 ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
31};
32use sui_types::base_types::{ObjectInfo, ObjectRef};
33use sui_types::digests::TransactionEventsDigest;
34use sui_types::dynamic_field::{self, DynamicFieldInfo};
35use sui_types::effects::TransactionEvents;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
37use sui_types::inner_temporary_store::TxCoins;
38use sui_types::object::{Object, Owner};
39use sui_types::parse_sui_struct_tag;
40use sui_types::storage::error::Error as StorageError;
41use tracing::{debug, info, instrument, trace};
42use typed_store::DBMapUtils;
43use typed_store::rocks::{
44 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, StagedBatch, default_db_options,
45 read_size_from_env,
46};
47use typed_store::traits::Map;
48
49type OwnerIndexKey = (SuiAddress, ObjectID);
50type DynamicFieldKey = (ObjectID, ObjectID);
51type EventId = (TxSequenceNumber, usize);
52type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
53type AllBalance = HashMap<TypeTag, TotalBalance>;
54
55#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
56pub struct CoinIndexKey2 {
57 pub owner: SuiAddress,
58 pub coin_type: String,
59 pub inverted_balance: u64,
62 pub object_id: ObjectID,
63}
64
65impl CoinIndexKey2 {
66 pub fn new_from_cursor(
67 owner: SuiAddress,
68 coin_type: String,
69 inverted_balance: u64,
70 object_id: ObjectID,
71 ) -> Self {
72 Self {
73 owner,
74 coin_type,
75 inverted_balance,
76 object_id,
77 }
78 }
79
80 pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
81 Self {
82 owner,
83 coin_type,
84 inverted_balance: !balance,
85 object_id,
86 }
87 }
88}
89
90const CURRENT_DB_VERSION: u64 = 0;
91const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
92
93#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
94struct MetadataInfo {
95 version: u64,
97 column_families: BTreeMap<String, ColumnFamilyInfo>,
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
105struct ColumnFamilyInfo {
106 version: u64,
107}
108
109pub const MAX_TX_RANGE_SIZE: u64 = 4096;
110
111pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
112const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
113const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
114const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
115
116#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
117pub struct TotalBalance {
118 pub balance: i128,
119 pub num_coins: i64,
120 pub address_balance: u64,
121}
122
123#[derive(Debug)]
124pub struct ObjectIndexChanges {
125 pub deleted_owners: Vec<OwnerIndexKey>,
126 pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
127 pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
128 pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
129}
130
131#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
132pub struct CoinInfo {
133 pub version: SequenceNumber,
134 pub digest: ObjectDigest,
135 pub balance: u64,
136 pub previous_transaction: TransactionDigest,
137}
138
139impl CoinInfo {
140 pub fn from_object(object: &Object) -> Option<CoinInfo> {
141 object.as_coin_maybe().map(|coin| CoinInfo {
142 version: object.version(),
143 digest: object.digest(),
144 previous_transaction: object.previous_transaction,
145 balance: coin.value(),
146 })
147 }
148}
149
150pub struct IndexStoreMetrics {
151 balance_lookup_from_db: IntCounter,
152 balance_lookup_from_total: IntCounter,
153 all_balance_lookup_from_db: IntCounter,
154 all_balance_lookup_from_total: IntCounter,
155}
156
157impl IndexStoreMetrics {
158 pub fn new(registry: &Registry) -> IndexStoreMetrics {
159 Self {
160 balance_lookup_from_db: register_int_counter_with_registry!(
161 "balance_lookup_from_db",
162 "Total number of balance requests served from cache",
163 registry,
164 )
165 .unwrap(),
166 balance_lookup_from_total: register_int_counter_with_registry!(
167 "balance_lookup_from_total",
168 "Total number of balance requests served ",
169 registry,
170 )
171 .unwrap(),
172 all_balance_lookup_from_db: register_int_counter_with_registry!(
173 "all_balance_lookup_from_db",
174 "Total number of all balance requests served from cache",
175 registry,
176 )
177 .unwrap(),
178 all_balance_lookup_from_total: register_int_counter_with_registry!(
179 "all_balance_lookup_from_total",
180 "Total number of all balance requests served",
181 registry,
182 )
183 .unwrap(),
184 }
185 }
186}
187
188pub struct IndexStoreCaches {
189 per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
190 all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
191 pub locks: MutexTable<SuiAddress>,
192}
193
194type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
195
196pub struct IndexStoreCacheUpdatesWithLocks {
201 pub(crate) _locks: Option<Vec<OwnedMutexGuard<()>>>,
202 pub(crate) inner: IndexStoreCacheUpdates,
203}
204
205impl IndexStoreCacheUpdatesWithLocks {
206 pub fn into_inner(self) -> IndexStoreCacheUpdates {
207 self.inner
208 }
209}
210
211#[derive(Default)]
214pub struct IndexStoreCacheUpdates {
215 per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
216 all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
217}
218
219#[derive(DBMapUtils)]
220pub struct IndexStoreTables {
221 meta: DBMap<(), MetadataInfo>,
228
229 transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
231
232 transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
234
235 #[deprecated]
237 transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
238
239 #[deprecated]
241 transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
242
243 transactions_by_move_function:
245 DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
246
247 transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
249
250 transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
252
253 owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
258
259 coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
260 address_balances: DBMap<(SuiAddress, TypeTag), ()>,
262
263 dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
268
269 event_order: DBMap<EventId, EventIndex>,
270 event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
271 event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
272 event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
273 event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
274 event_by_time: DBMap<(u64, EventId), EventIndex>,
275
276 pruner_watermark: DBMap<(), TxSequenceNumber>,
277}
278
279impl IndexStoreTables {
280 pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
281 &self.owner_index
282 }
283
284 pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
285 &self.coin_index_2
286 }
287
288 pub fn check_databases_equal(&self, other: &IndexStoreTables) {
289 fn assert_tables_equal<K, V>(name: &str, table_a: &DBMap<K, V>, table_b: &DBMap<K, V>)
290 where
291 K: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
292 V: Serialize + DeserializeOwned + PartialEq + std::fmt::Debug,
293 {
294 let mut iter_a = table_a.safe_iter();
295 let mut iter_b = table_b.safe_iter();
296 let mut count = 0u64;
297 loop {
298 match (iter_a.next(), iter_b.next()) {
299 (Some(a), Some(b)) => {
300 let (ka, va): (K, V) = a.expect("failed to read from table_a");
301 let (kb, vb): (K, V) = b.expect("failed to read from table_b");
302 assert!(
303 ka == kb && va == vb,
304 "{name}: mismatch at entry {count}:\n a=({ka:?}, {va:?})\n b=({kb:?}, {vb:?})"
305 );
306 count += 1;
307 }
308 (None, None) => break,
309 (Some(_), None) => {
310 panic!(
311 "{name}: table_a has more entries than table_b (diverged after {count} entries)"
312 );
313 }
314 (None, Some(_)) => {
315 panic!(
316 "{name}: table_b has more entries than table_a (diverged after {count} entries)"
317 );
318 }
319 }
320 }
321 info!("{name}: verified {count} entries are identical");
322 }
323
324 fn assert_seq_table_equal<P, V>(
330 name: &str,
331 table_a: &DBMap<(P, TxSequenceNumber), V>,
332 table_b: &DBMap<(P, TxSequenceNumber), V>,
333 ) where
334 P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
335 V: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
336 {
337 let mut entries_a: Vec<(P, V)> = table_a
338 .safe_iter()
339 .map(|r| {
340 let ((p, _seq), v) = r.expect("failed to read from table_a");
341 (p, v)
342 })
343 .collect();
344 let mut entries_b: Vec<(P, V)> = table_b
345 .safe_iter()
346 .map(|r| {
347 let ((p, _seq), v) = r.expect("failed to read from table_b");
348 (p, v)
349 })
350 .collect();
351 entries_a.sort();
352 entries_a.dedup();
353 entries_b.sort();
354 entries_b.dedup();
355 assert!(
356 entries_a.len() == entries_b.len(),
357 "{name}: different number of unique entries: {} vs {}",
358 entries_a.len(),
359 entries_b.len()
360 );
361 for (i, (a, b)) in entries_a.iter().zip(entries_b.iter()).enumerate() {
362 assert!(
363 a == b,
364 "{name}: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
365 );
366 }
367 info!(
368 "{name}: verified {} unique entries match (ignoring sequence numbers)",
369 entries_a.len()
370 );
371 }
372
373 type EventKey = (TransactionEventsDigest, TransactionDigest);
376 fn strip_timestamp(ei: EventIndex) -> EventKey {
377 (ei.0, ei.1)
378 }
379
380 fn assert_event_table_equal<P>(
384 name: &str,
385 table_a: &DBMap<(P, EventId), EventIndex>,
386 table_b: &DBMap<(P, EventId), EventIndex>,
387 ) where
388 P: Serialize + DeserializeOwned + Ord + std::fmt::Debug,
389 {
390 let mut entries_a: Vec<(P, EventKey)> = table_a
391 .safe_iter()
392 .map(|r| {
393 let ((p, _eid), v) = r.expect("failed to read from table_a");
394 (p, strip_timestamp(v))
395 })
396 .collect();
397 let mut entries_b: Vec<(P, EventKey)> = table_b
398 .safe_iter()
399 .map(|r| {
400 let ((p, _eid), v) = r.expect("failed to read from table_b");
401 (p, strip_timestamp(v))
402 })
403 .collect();
404 entries_a.sort();
405 entries_a.dedup();
406 entries_b.sort();
407 entries_b.dedup();
408 assert!(
409 entries_a.len() == entries_b.len(),
410 "{name}: different number of unique entries: {} vs {}",
411 entries_a.len(),
412 entries_b.len()
413 );
414 for (i, (a, b)) in entries_a.iter().zip(entries_b.iter()).enumerate() {
415 assert!(
416 a == b,
417 "{name}: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
418 );
419 }
420 info!(
421 "{name}: verified {} unique entries match (ignoring event ids and timestamps)",
422 entries_a.len()
423 );
424 }
425
426 assert_tables_equal("owner_index", &self.owner_index, &other.owner_index);
428 assert_tables_equal("coin_index_2", &self.coin_index_2, &other.coin_index_2);
429 assert_tables_equal(
430 "address_balances",
431 &self.address_balances,
432 &other.address_balances,
433 );
434 assert_tables_equal(
435 "dynamic_field_index",
436 &self.dynamic_field_index,
437 &other.dynamic_field_index,
438 );
439
440 assert_seq_table_equal(
442 "transactions_from_addr",
443 &self.transactions_from_addr,
444 &other.transactions_from_addr,
445 );
446 assert_seq_table_equal(
447 "transactions_to_addr",
448 &self.transactions_to_addr,
449 &other.transactions_to_addr,
450 );
451 {
453 let mut entries_a: Vec<((ObjectID, String, String), TransactionDigest)> = self
454 .transactions_by_move_function
455 .safe_iter()
456 .map(|r| {
457 let ((pkg, module, func, _seq), digest) =
458 r.expect("failed to read from table_a");
459 ((pkg, module, func), digest)
460 })
461 .collect();
462 let mut entries_b: Vec<((ObjectID, String, String), TransactionDigest)> = other
463 .transactions_by_move_function
464 .safe_iter()
465 .map(|r| {
466 let ((pkg, module, func, _seq), digest) =
467 r.expect("failed to read from table_b");
468 ((pkg, module, func), digest)
469 })
470 .collect();
471 entries_a.sort();
472 entries_a.dedup();
473 entries_b.sort();
474 entries_b.dedup();
475 assert!(
476 entries_a.len() == entries_b.len(),
477 "transactions_by_move_function: different number of unique entries: {} vs {}",
478 entries_a.len(),
479 entries_b.len()
480 );
481 for (i, (a, b)) in entries_a.iter().zip(entries_b.iter()).enumerate() {
482 assert!(
483 a == b,
484 "transactions_by_move_function: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
485 );
486 }
487 info!(
488 "transactions_by_move_function: verified {} entries match (ignoring sequence numbers)",
489 entries_a.len()
490 );
491 }
492
493 {
495 let mut vals_a: Vec<EventKey> = self
497 .event_order
498 .safe_iter()
499 .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
500 .collect();
501 let mut vals_b: Vec<EventKey> = other
502 .event_order
503 .safe_iter()
504 .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
505 .collect();
506 vals_a.sort();
507 vals_a.dedup();
508 vals_b.sort();
509 vals_b.dedup();
510 assert!(
511 vals_a.len() == vals_b.len(),
512 "event_order: different number of entries: {} vs {}",
513 vals_a.len(),
514 vals_b.len()
515 );
516 for (i, (a, b)) in vals_a.iter().zip(vals_b.iter()).enumerate() {
517 assert!(
518 a == b,
519 "event_order: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
520 );
521 }
522 info!(
523 "event_order: verified {} entries match (ignoring event ids and timestamps)",
524 vals_a.len()
525 );
526 }
527 assert_event_table_equal(
528 "event_by_move_module",
529 &self.event_by_move_module,
530 &other.event_by_move_module,
531 );
532 assert_event_table_equal(
533 "event_by_move_event",
534 &self.event_by_move_event,
535 &other.event_by_move_event,
536 );
537 assert_event_table_equal(
538 "event_by_event_module",
539 &self.event_by_event_module,
540 &other.event_by_event_module,
541 );
542 assert_event_table_equal(
543 "event_by_sender",
544 &self.event_by_sender,
545 &other.event_by_sender,
546 );
547 {
550 let mut vals_a: Vec<EventKey> = self
551 .event_by_time
552 .safe_iter()
553 .map(|r| strip_timestamp(r.expect("failed to read from table_a").1))
554 .collect();
555 let mut vals_b: Vec<EventKey> = other
556 .event_by_time
557 .safe_iter()
558 .map(|r| strip_timestamp(r.expect("failed to read from table_b").1))
559 .collect();
560 vals_a.sort();
561 vals_a.dedup();
562 vals_b.sort();
563 vals_b.dedup();
564 assert!(
565 vals_a.len() == vals_b.len(),
566 "event_by_time: different number of entries: {} vs {}",
567 vals_a.len(),
568 vals_b.len()
569 );
570 for (i, (a, b)) in vals_a.iter().zip(vals_b.iter()).enumerate() {
571 assert!(
572 a == b,
573 "event_by_time: mismatch at sorted entry {i}:\n a={a:?}\n b={b:?}"
574 );
575 }
576 info!(
577 "event_by_time: verified {} entries match (ignoring timestamps and event ids)",
578 vals_a.len()
579 );
580 }
581
582 }
588
589 #[allow(deprecated)]
590 fn init(&mut self) -> Result<(), StorageError> {
591 let metadata = {
592 match self.meta.get(&()) {
593 Ok(Some(metadata)) => metadata,
594 Ok(None) | Err(_) => MetadataInfo {
595 version: CURRENT_DB_VERSION,
596 column_families: BTreeMap::new(),
597 },
598 }
599 };
600
601 self.meta.insert(&(), &metadata)?;
603
604 Ok(())
605 }
606
607 pub fn get_dynamic_fields_iterator(
608 &self,
609 object: ObjectID,
610 cursor: Option<ObjectID>,
611 ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
612 debug!(?object, "get_dynamic_fields");
613 let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
615 let iter_upper_bound = (object, ObjectID::MAX);
616 self.dynamic_field_index
617 .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
618 .skip(usize::from(cursor.is_some()))
620 .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
621 .map_ok(|((_, c), object_info)| (c, object_info))
622 }
623}
624
625pub struct IndexStore {
626 next_sequence_number: AtomicU64,
627 tables: IndexStoreTables,
628 pub caches: IndexStoreCaches,
629 metrics: Arc<IndexStoreMetrics>,
630 max_type_length: u64,
631 remove_deprecated_tables: bool,
632 pruner_watermark: Arc<AtomicU64>,
633}
634
635struct JsonRpcCompactionMetrics {
636 key_removed: IntCounterVec,
637 key_kept: IntCounterVec,
638 key_error: IntCounterVec,
639}
640
641impl JsonRpcCompactionMetrics {
642 pub fn new(registry: &Registry) -> Arc<Self> {
643 Arc::new(Self {
644 key_removed: register_int_counter_vec_with_registry!(
645 "json_rpc_compaction_filter_key_removed",
646 "Compaction key removed",
647 &["cf"],
648 registry
649 )
650 .unwrap(),
651 key_kept: register_int_counter_vec_with_registry!(
652 "json_rpc_compaction_filter_key_kept",
653 "Compaction key kept",
654 &["cf"],
655 registry
656 )
657 .unwrap(),
658 key_error: register_int_counter_vec_with_registry!(
659 "json_rpc_compaction_filter_key_error",
660 "Compaction error",
661 &["cf"],
662 registry
663 )
664 .unwrap(),
665 })
666 }
667}
668
669fn compaction_filter_config<T: DeserializeOwned>(
670 name: &str,
671 metrics: Arc<JsonRpcCompactionMetrics>,
672 mut db_options: DBOptions,
673 pruner_watermark: Arc<AtomicU64>,
674 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
675 by_key: bool,
676) -> DBOptions {
677 let cf = name.to_string();
678 db_options
679 .options
680 .set_compaction_filter(name, move |_, key, value| {
681 let bytes = if by_key { key } else { value };
682 let deserializer = bincode::DefaultOptions::new()
683 .with_big_endian()
684 .with_fixint_encoding();
685 match deserializer.deserialize(bytes) {
686 Ok(key_data) => {
687 let sequence_number = extractor(key_data);
688 if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
689 metrics.key_removed.with_label_values(&[&cf]).inc();
690 Decision::Remove
691 } else {
692 metrics.key_kept.with_label_values(&[&cf]).inc();
693 Decision::Keep
694 }
695 }
696 Err(_) => {
697 metrics.key_error.with_label_values(&[&cf]).inc();
698 Decision::Keep
699 }
700 }
701 });
702 db_options
703}
704
705fn compaction_filter_config_by_key<T: DeserializeOwned>(
706 name: &str,
707 metrics: Arc<JsonRpcCompactionMetrics>,
708 db_options: DBOptions,
709 pruner_watermark: Arc<AtomicU64>,
710 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
711) -> DBOptions {
712 compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
713}
714
715fn coin_index_table_default_config() -> DBOptions {
716 default_db_options()
717 .optimize_for_write_throughput()
718 .optimize_for_read(
719 read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
720 )
721 .disable_write_throttling()
722}
723
724impl IndexStore {
725 pub fn new_without_init(
726 path: PathBuf,
727 registry: &Registry,
728 max_type_length: Option<u64>,
729 remove_deprecated_tables: bool,
730 ) -> Self {
731 let db_options = default_db_options().disable_write_throttling();
732 let pruner_watermark = Arc::new(AtomicU64::new(0));
733 let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
734 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
735 (
736 "transactions_from_addr".to_string(),
737 compaction_filter_config_by_key(
738 "transactions_from_addr",
739 compaction_metrics.clone(),
740 db_options.clone(),
741 pruner_watermark.clone(),
742 |(_, id): (SuiAddress, TxSequenceNumber)| id,
743 ),
744 ),
745 (
746 "transactions_to_addr".to_string(),
747 compaction_filter_config_by_key(
748 "transactions_to_addr",
749 compaction_metrics.clone(),
750 db_options.clone(),
751 pruner_watermark.clone(),
752 |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
753 ),
754 ),
755 (
756 "transactions_by_move_function".to_string(),
757 compaction_filter_config_by_key(
758 "transactions_by_move_function",
759 compaction_metrics.clone(),
760 db_options.clone(),
761 pruner_watermark.clone(),
762 |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
763 ),
764 ),
765 (
766 "transaction_order".to_string(),
767 compaction_filter_config_by_key(
768 "transaction_order",
769 compaction_metrics.clone(),
770 db_options.clone(),
771 pruner_watermark.clone(),
772 |sequence_number: TxSequenceNumber| sequence_number,
773 ),
774 ),
775 (
776 "transactions_seq".to_string(),
777 compaction_filter_config(
778 "transactions_seq",
779 compaction_metrics.clone(),
780 db_options.clone(),
781 pruner_watermark.clone(),
782 |sequence_number: TxSequenceNumber| sequence_number,
783 false,
784 ),
785 ),
786 (
787 "coin_index_2".to_string(),
788 coin_index_table_default_config(),
789 ),
790 (
791 "event_order".to_string(),
792 compaction_filter_config_by_key(
793 "event_order",
794 compaction_metrics.clone(),
795 db_options.clone(),
796 pruner_watermark.clone(),
797 |event_id: EventId| event_id.0,
798 ),
799 ),
800 (
801 "event_by_move_module".to_string(),
802 compaction_filter_config_by_key(
803 "event_by_move_module",
804 compaction_metrics.clone(),
805 db_options.clone(),
806 pruner_watermark.clone(),
807 |(_, event_id): (ModuleId, EventId)| event_id.0,
808 ),
809 ),
810 (
811 "event_by_event_module".to_string(),
812 compaction_filter_config_by_key(
813 "event_by_event_module",
814 compaction_metrics.clone(),
815 db_options.clone(),
816 pruner_watermark.clone(),
817 |(_, event_id): (ModuleId, EventId)| event_id.0,
818 ),
819 ),
820 (
821 "event_by_sender".to_string(),
822 compaction_filter_config_by_key(
823 "event_by_sender",
824 compaction_metrics.clone(),
825 db_options.clone(),
826 pruner_watermark.clone(),
827 |(_, event_id): (SuiAddress, EventId)| event_id.0,
828 ),
829 ),
830 (
831 "event_by_time".to_string(),
832 compaction_filter_config_by_key(
833 "event_by_time",
834 compaction_metrics.clone(),
835 db_options.clone(),
836 pruner_watermark.clone(),
837 |(_, event_id): (u64, EventId)| event_id.0,
838 ),
839 ),
840 ]));
841 let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
842 path,
843 MetricConf::new("index"),
844 Some(db_options.options),
845 Some(table_options),
846 remove_deprecated_tables,
847 );
848
849 let metrics = IndexStoreMetrics::new(registry);
850 let caches = IndexStoreCaches {
851 per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
852 all_balances: ShardedLruCache::new(1_000_000, 1000),
853 locks: MutexTable::new(128),
854 };
855 let next_sequence_number = tables
856 .transaction_order
857 .reversed_safe_iter_with_bounds(None, None)
858 .expect("failed to initialize indexes")
859 .next()
860 .transpose()
861 .expect("failed to initialize indexes")
862 .map(|(seq, _)| seq + 1)
863 .unwrap_or(0)
864 .into();
865 let pruner_watermark_value = tables
866 .pruner_watermark
867 .get(&())
868 .expect("failed to initialize index tables")
869 .unwrap_or(0);
870 pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
871
872 Self {
873 tables,
874 next_sequence_number,
875 caches,
876 metrics: Arc::new(metrics),
877 max_type_length: max_type_length.unwrap_or(128),
878 remove_deprecated_tables,
879 pruner_watermark,
880 }
881 }
882
883 pub fn new(
884 path: PathBuf,
885 registry: &Registry,
886 max_type_length: Option<u64>,
887 remove_deprecated_tables: bool,
888 ) -> Self {
889 let mut store =
890 Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
891 store.tables.init().unwrap();
892 store
893 }
894
895 pub fn tables(&self) -> &IndexStoreTables {
896 &self.tables
897 }
898
899 #[instrument(skip_all)]
900 pub fn index_coin(
901 &self,
902 digest: &TransactionDigest,
903 batch: &mut StagedBatch,
904 object_index_changes: &ObjectIndexChanges,
905 tx_coins: Option<TxCoins>,
906 acquire_locks: bool,
907 ) -> SuiResult<IndexStoreCacheUpdatesWithLocks> {
908 if tx_coins.is_none() {
912 return Ok(IndexStoreCacheUpdatesWithLocks {
913 _locks: None,
914 inner: IndexStoreCacheUpdates::default(),
915 });
916 }
917
918 let _locks = if acquire_locks {
919 let mut addresses: HashSet<SuiAddress> = HashSet::new();
920 addresses.extend(
921 object_index_changes
922 .deleted_owners
923 .iter()
924 .map(|(owner, _)| *owner),
925 );
926 addresses.extend(
927 object_index_changes
928 .new_owners
929 .iter()
930 .map(|((owner, _), _)| *owner),
931 );
932 Some(self.caches.locks.acquire_locks(addresses.into_iter()))
933 } else {
934 None
935 };
936
937 let (input_coins, written_coins) = tx_coins.unwrap();
938 let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
939 HashMap::new();
940
941 let coin_delete_keys = input_coins
943 .values()
944 .filter_map(|object| {
945 let Owner::AddressOwner(owner) = object.owner() else {
947 return None;
948 };
949
950 let (coin_type, coin) = object
952 .coin_type_maybe()
953 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
954
955 let key = CoinIndexKey2::new(
956 *owner,
957 coin_type.to_string(),
958 coin.balance.value(),
959 object.id(),
960 );
961
962 let map = balance_changes.entry(*owner).or_default();
963 let entry = map.entry(coin_type).or_insert(TotalBalance {
964 num_coins: 0,
965 balance: 0,
966 address_balance: 0,
967 });
968 entry.num_coins -= 1;
969 entry.balance -= coin.balance.value() as i128;
970
971 Some(key)
972 })
973 .collect::<Vec<_>>();
974 trace!(
975 tx_digset=?digest,
976 "coin_delete_keys: {:?}",
977 coin_delete_keys,
978 );
979 batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
980
981 let coin_add_keys = written_coins
983 .values()
984 .filter_map(|object| {
985 let Owner::AddressOwner(owner) = object.owner() else {
987 return None;
988 };
989
990 let (coin_type, coin) = object
992 .coin_type_maybe()
993 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
994
995 let key = CoinIndexKey2::new(
996 *owner,
997 coin_type.to_string(),
998 coin.balance.value(),
999 object.id(),
1000 );
1001 let value = CoinInfo {
1002 version: object.version(),
1003 digest: object.digest(),
1004 balance: coin.balance.value(),
1005 previous_transaction: object.previous_transaction,
1006 };
1007 let map = balance_changes.entry(*owner).or_default();
1008 let entry = map.entry(coin_type).or_insert(TotalBalance {
1009 num_coins: 0,
1010 balance: 0,
1011 address_balance: 0,
1012 });
1013 entry.num_coins += 1;
1014 entry.balance += coin.balance.value() as i128;
1015
1016 Some((key, value))
1017 })
1018 .collect::<Vec<_>>();
1019 trace!(
1020 tx_digset=?digest,
1021 "coin_add_keys: {:?}",
1022 coin_add_keys,
1023 );
1024
1025 batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
1026
1027 let per_coin_type_balance_changes: Vec<_> = balance_changes
1028 .iter()
1029 .flat_map(|(address, balance_map)| {
1030 balance_map.iter().map(|(type_tag, balance)| {
1031 (
1032 (*address, type_tag.clone()),
1033 Ok::<TotalBalance, SuiError>(*balance),
1034 )
1035 })
1036 })
1037 .collect();
1038 let all_balance_changes: Vec<_> = balance_changes
1039 .into_iter()
1040 .map(|(address, balance_map)| {
1041 (
1042 address,
1043 Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
1044 )
1045 })
1046 .collect();
1047 let cache_updates = IndexStoreCacheUpdatesWithLocks {
1048 _locks,
1049 inner: IndexStoreCacheUpdates {
1050 per_coin_type_balance_changes,
1051 all_balance_changes,
1052 },
1053 };
1054 Ok(cache_updates)
1055 }
1056
1057 pub fn allocate_sequence_number(&self) -> u64 {
1058 self.next_sequence_number.fetch_add(1, Ordering::SeqCst)
1059 }
1060
1061 #[instrument(skip_all)]
1062 pub fn index_tx(
1063 &self,
1064 sequence: u64,
1065 sender: SuiAddress,
1066 active_inputs: impl Iterator<Item = ObjectID>,
1067 mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
1068 move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
1069 events: &TransactionEvents,
1070 object_index_changes: ObjectIndexChanges,
1071 digest: &TransactionDigest,
1072 timestamp_ms: u64,
1073 tx_coins: Option<TxCoins>,
1074 accumulator_events: Vec<AccumulatorEvent>,
1075 acquire_locks: bool,
1076 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
1077 let mut batch = StagedBatch::new();
1078
1079 batch.insert_batch(
1080 &self.tables.transaction_order,
1081 std::iter::once((sequence, *digest)),
1082 )?;
1083
1084 batch.insert_batch(
1085 &self.tables.transactions_seq,
1086 std::iter::once((*digest, sequence)),
1087 )?;
1088
1089 batch.insert_batch(
1090 &self.tables.transactions_from_addr,
1091 std::iter::once(((sender, sequence), *digest)),
1092 )?;
1093
1094 #[allow(deprecated)]
1095 if !self.remove_deprecated_tables {
1096 batch.insert_batch(
1097 &self.tables.transactions_by_input_object_id,
1098 active_inputs.map(|id| ((id, sequence), *digest)),
1099 )?;
1100
1101 batch.insert_batch(
1102 &self.tables.transactions_by_mutated_object_id,
1103 mutated_objects
1104 .clone()
1105 .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
1106 )?;
1107 }
1108
1109 batch.insert_batch(
1110 &self.tables.transactions_by_move_function,
1111 move_functions
1112 .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
1113 )?;
1114
1115 batch.insert_batch(
1116 &self.tables.transactions_to_addr,
1117 mutated_objects.filter_map(|(_, owner)| {
1118 owner
1119 .get_address_owner_address()
1120 .ok()
1121 .map(|addr| ((addr, sequence), digest))
1122 }),
1123 )?;
1124
1125 let cache_updates = self.index_coin(
1127 digest,
1128 &mut batch,
1129 &object_index_changes,
1130 tx_coins,
1131 acquire_locks,
1132 )?;
1133
1134 let address_balance_updates = accumulator_events.into_iter().filter_map(|event| {
1136 let ty = &event.write.address.ty;
1137 let coin_type = sui_types::balance::Balance::maybe_get_balance_type_param(ty)?;
1138 Some(((event.write.address.address, coin_type), ()))
1139 });
1140 batch.insert_batch(&self.tables.address_balances, address_balance_updates)?;
1141
1142 batch.delete_batch(
1144 &self.tables.owner_index,
1145 object_index_changes.deleted_owners.into_iter(),
1146 )?;
1147 batch.delete_batch(
1148 &self.tables.dynamic_field_index,
1149 object_index_changes.deleted_dynamic_fields.into_iter(),
1150 )?;
1151
1152 batch.insert_batch(
1153 &self.tables.owner_index,
1154 object_index_changes.new_owners.into_iter(),
1155 )?;
1156
1157 batch.insert_batch(
1158 &self.tables.dynamic_field_index,
1159 object_index_changes.new_dynamic_fields.into_iter(),
1160 )?;
1161
1162 let event_digest = events.digest();
1164 batch.insert_batch(
1165 &self.tables.event_order,
1166 events
1167 .data
1168 .iter()
1169 .enumerate()
1170 .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
1171 )?;
1172 batch.insert_batch(
1173 &self.tables.event_by_move_module,
1174 events
1175 .data
1176 .iter()
1177 .enumerate()
1178 .map(|(i, e)| {
1179 (
1180 i,
1181 ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
1182 )
1183 })
1184 .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
1185 )?;
1186 batch.insert_batch(
1187 &self.tables.event_by_sender,
1188 events.data.iter().enumerate().map(|(i, e)| {
1189 (
1190 (e.sender, (sequence, i)),
1191 (event_digest, *digest, timestamp_ms),
1192 )
1193 }),
1194 )?;
1195 batch.insert_batch(
1196 &self.tables.event_by_move_event,
1197 events.data.iter().enumerate().map(|(i, e)| {
1198 (
1199 (e.type_.clone(), (sequence, i)),
1200 (event_digest, *digest, timestamp_ms),
1201 )
1202 }),
1203 )?;
1204
1205 batch.insert_batch(
1206 &self.tables.event_by_time,
1207 events.data.iter().enumerate().map(|(i, _)| {
1208 (
1209 (timestamp_ms, (sequence, i)),
1210 (event_digest, *digest, timestamp_ms),
1211 )
1212 }),
1213 )?;
1214
1215 batch.insert_batch(
1216 &self.tables.event_by_event_module,
1217 events.data.iter().enumerate().map(|(i, e)| {
1218 (
1219 (
1220 ModuleId::new(e.type_.address, e.type_.module.clone()),
1221 (sequence, i),
1222 ),
1223 (event_digest, *digest, timestamp_ms),
1224 )
1225 }),
1226 )?;
1227
1228 Ok((batch, cache_updates))
1229 }
1230
1231 pub fn commit_index_batch(
1233 &self,
1234 batch: DBBatch,
1235 cache_updates: Vec<IndexStoreCacheUpdates>,
1236 ) -> SuiResult {
1237 let invalidate_caches =
1238 read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
1239
1240 if invalidate_caches {
1241 for cu in &cache_updates {
1242 self.invalidate_per_coin_type_cache(
1243 cu.per_coin_type_balance_changes.iter().map(|x| x.0.clone()),
1244 )?;
1245 self.invalidate_all_balance_cache(cu.all_balance_changes.iter().map(|x| x.0))?;
1246 }
1247 }
1248
1249 batch.write()?;
1250
1251 if !invalidate_caches {
1252 for cu in cache_updates {
1253 self.update_per_coin_type_cache(cu.per_coin_type_balance_changes)?;
1254 self.update_all_balance_cache(cu.all_balance_changes)?;
1255 }
1256 }
1257 Ok(())
1258 }
1259
1260 pub fn new_db_batch(&self) -> DBBatch {
1261 self.tables.transactions_from_addr.batch()
1262 }
1263
1264 pub fn next_sequence_number(&self) -> TxSequenceNumber {
1265 self.next_sequence_number.load(Ordering::SeqCst) + 1
1266 }
1267
1268 #[instrument(skip(self))]
1269 pub fn get_transactions(
1270 &self,
1271 filter: Option<TransactionFilter>,
1272 cursor: Option<TransactionDigest>,
1273 limit: Option<usize>,
1274 reverse: bool,
1275 ) -> SuiResult<Vec<TransactionDigest>> {
1276 let cursor = if let Some(cursor) = cursor {
1278 Some(
1279 self.get_transaction_seq(&cursor)?
1280 .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
1281 )
1282 } else {
1283 None
1284 };
1285 match filter {
1286 Some(TransactionFilter::MoveFunction {
1287 package,
1288 module,
1289 function,
1290 }) => Ok(self.get_transactions_by_move_function(
1291 package, module, function, cursor, limit, reverse,
1292 )?),
1293 Some(TransactionFilter::InputObject(object_id)) => {
1294 Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
1295 }
1296 Some(TransactionFilter::ChangedObject(object_id)) => {
1297 Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
1298 }
1299 Some(TransactionFilter::FromAddress(address)) => {
1300 Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
1301 }
1302 Some(TransactionFilter::ToAddress(address)) => {
1303 Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
1304 }
1305 Some(_) => Err(SuiErrorKind::UserInputError {
1308 error: UserInputError::Unsupported(format!("{:?}", filter)),
1309 }
1310 .into()),
1311 None => {
1312 if reverse {
1313 let iter = self
1314 .tables
1315 .transaction_order
1316 .reversed_safe_iter_with_bounds(
1317 None,
1318 Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
1319 )?
1320 .skip(usize::from(cursor.is_some()))
1321 .map(|result| result.map(|(_, digest)| digest));
1322 if let Some(limit) = limit {
1323 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1324 } else {
1325 Ok(iter.collect::<Result<Vec<_>, _>>()?)
1326 }
1327 } else {
1328 let iter = self
1329 .tables
1330 .transaction_order
1331 .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
1332 .skip(usize::from(cursor.is_some()))
1333 .map(|result| result.map(|(_, digest)| digest));
1334 if let Some(limit) = limit {
1335 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
1336 } else {
1337 Ok(iter.collect::<Result<Vec<_>, _>>()?)
1338 }
1339 }
1340 }
1341 }
1342 }
1343
1344 #[instrument(skip_all)]
1345 fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
1346 index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
1347 key: KeyT,
1348 cursor: Option<TxSequenceNumber>,
1349 limit: Option<usize>,
1350 reverse: bool,
1351 ) -> SuiResult<Vec<TransactionDigest>> {
1352 Ok(if reverse {
1353 let iter = index
1354 .reversed_safe_iter_with_bounds(
1355 None,
1356 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
1357 )?
1358 .skip(usize::from(cursor.is_some()))
1360 .take_while(|result| {
1361 result
1362 .as_ref()
1363 .map(|((id, _), _)| *id == key)
1364 .unwrap_or(false)
1365 })
1366 .map(|result| result.map(|(_, digest)| digest));
1367 if let Some(limit) = limit {
1368 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1369 } else {
1370 iter.collect::<Result<Vec<_>, _>>()?
1371 }
1372 } else {
1373 let iter = index
1374 .safe_iter_with_bounds(
1375 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1376 None,
1377 )
1378 .skip(usize::from(cursor.is_some()))
1380 .map(|result| result.expect("iterator db error"))
1381 .take_while(|((id, _), _)| *id == key)
1382 .map(|(_, digest)| digest);
1383 if let Some(limit) = limit {
1384 iter.take(limit).collect()
1385 } else {
1386 iter.collect()
1387 }
1388 })
1389 }
1390
1391 #[instrument(skip(self))]
1392 pub fn get_transactions_by_input_object(
1393 &self,
1394 input_object: ObjectID,
1395 cursor: Option<TxSequenceNumber>,
1396 limit: Option<usize>,
1397 reverse: bool,
1398 ) -> SuiResult<Vec<TransactionDigest>> {
1399 if self.remove_deprecated_tables {
1400 return Ok(vec![]);
1401 }
1402 #[allow(deprecated)]
1403 Self::get_transactions_from_index(
1404 &self.tables.transactions_by_input_object_id,
1405 input_object,
1406 cursor,
1407 limit,
1408 reverse,
1409 )
1410 }
1411
1412 #[instrument(skip(self))]
1413 pub fn get_transactions_by_mutated_object(
1414 &self,
1415 mutated_object: ObjectID,
1416 cursor: Option<TxSequenceNumber>,
1417 limit: Option<usize>,
1418 reverse: bool,
1419 ) -> SuiResult<Vec<TransactionDigest>> {
1420 if self.remove_deprecated_tables {
1421 return Ok(vec![]);
1422 }
1423 #[allow(deprecated)]
1424 Self::get_transactions_from_index(
1425 &self.tables.transactions_by_mutated_object_id,
1426 mutated_object,
1427 cursor,
1428 limit,
1429 reverse,
1430 )
1431 }
1432
1433 #[instrument(skip(self))]
1434 pub fn get_transactions_from_addr(
1435 &self,
1436 addr: SuiAddress,
1437 cursor: Option<TxSequenceNumber>,
1438 limit: Option<usize>,
1439 reverse: bool,
1440 ) -> SuiResult<Vec<TransactionDigest>> {
1441 Self::get_transactions_from_index(
1442 &self.tables.transactions_from_addr,
1443 addr,
1444 cursor,
1445 limit,
1446 reverse,
1447 )
1448 }
1449
1450 #[instrument(skip(self))]
1451 pub fn get_transactions_by_move_function(
1452 &self,
1453 package: ObjectID,
1454 module: Option<String>,
1455 function: Option<String>,
1456 cursor: Option<TxSequenceNumber>,
1457 limit: Option<usize>,
1458 reverse: bool,
1459 ) -> SuiResult<Vec<TransactionDigest>> {
1460 if function.is_some() && module.is_none() {
1462 return Err(SuiErrorKind::UserInputError {
1463 error: UserInputError::MoveFunctionInputError(
1464 "Cannot supply function without supplying module".to_string(),
1465 ),
1466 }
1467 .into());
1468 }
1469
1470 if cursor.is_some() && (module.is_none() || function.is_none()) {
1472 return Err(SuiErrorKind::UserInputError {
1473 error: UserInputError::MoveFunctionInputError(
1474 "Cannot supply cursor without supplying module and function".to_string(),
1475 ),
1476 }
1477 .into());
1478 }
1479
1480 let cursor_val = cursor.unwrap_or(if reverse {
1481 TxSequenceNumber::MAX
1482 } else {
1483 TxSequenceNumber::MIN
1484 });
1485
1486 let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1487 let module_val = module.clone().unwrap_or(if reverse {
1488 max_string.clone()
1489 } else {
1490 "".to_string()
1491 });
1492
1493 let function_val =
1494 function
1495 .clone()
1496 .unwrap_or(if reverse { max_string } else { "".to_string() });
1497
1498 let key = (package, module_val, function_val, cursor_val);
1499 Ok(if reverse {
1500 let iter = self
1501 .tables
1502 .transactions_by_move_function
1503 .reversed_safe_iter_with_bounds(None, Some(key))?
1504 .skip(usize::from(cursor.is_some()))
1506 .take_while(|result| {
1507 result
1508 .as_ref()
1509 .map(|((id, m, f, _), _)| {
1510 *id == package
1511 && module.as_ref().map(|x| x == m).unwrap_or(true)
1512 && function.as_ref().map(|x| x == f).unwrap_or(true)
1513 })
1514 .unwrap_or(false)
1515 })
1516 .map(|result| result.map(|(_, digest)| digest));
1517 if let Some(limit) = limit {
1518 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1519 } else {
1520 iter.collect::<Result<Vec<_>, _>>()?
1521 }
1522 } else {
1523 let iter = self
1524 .tables
1525 .transactions_by_move_function
1526 .safe_iter_with_bounds(Some(key), None)
1527 .map(|result| result.expect("iterator db error"))
1528 .skip(usize::from(cursor.is_some()))
1530 .take_while(|((id, m, f, _), _)| {
1531 *id == package
1532 && module.as_ref().map(|x| x == m).unwrap_or(true)
1533 && function.as_ref().map(|x| x == f).unwrap_or(true)
1534 })
1535 .map(|(_, digest)| digest);
1536 if let Some(limit) = limit {
1537 iter.take(limit).collect()
1538 } else {
1539 iter.collect()
1540 }
1541 })
1542 }
1543
1544 #[instrument(skip(self))]
1545 pub fn get_transactions_to_addr(
1546 &self,
1547 addr: SuiAddress,
1548 cursor: Option<TxSequenceNumber>,
1549 limit: Option<usize>,
1550 reverse: bool,
1551 ) -> SuiResult<Vec<TransactionDigest>> {
1552 Self::get_transactions_from_index(
1553 &self.tables.transactions_to_addr,
1554 addr,
1555 cursor,
1556 limit,
1557 reverse,
1558 )
1559 }
1560
1561 #[instrument(skip(self))]
1562 pub fn get_transaction_seq(
1563 &self,
1564 digest: &TransactionDigest,
1565 ) -> SuiResult<Option<TxSequenceNumber>> {
1566 Ok(self.tables.transactions_seq.get(digest)?)
1567 }
1568
1569 #[instrument(skip(self))]
1570 pub fn all_events(
1571 &self,
1572 tx_seq: TxSequenceNumber,
1573 event_seq: usize,
1574 limit: usize,
1575 descending: bool,
1576 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1577 Ok(if descending {
1578 self.tables
1579 .event_order
1580 .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1581 .take(limit)
1582 .map(|result| {
1583 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1584 (digest, tx_digest, event_seq, time)
1585 })
1586 })
1587 .collect::<Result<Vec<_>, _>>()?
1588 } else {
1589 self.tables
1590 .event_order
1591 .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1592 .take(limit)
1593 .map(|result| {
1594 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1595 (digest, tx_digest, event_seq, time)
1596 })
1597 })
1598 .collect::<Result<Vec<_>, _>>()?
1599 })
1600 }
1601
1602 #[instrument(skip(self))]
1603 pub fn events_by_transaction(
1604 &self,
1605 digest: &TransactionDigest,
1606 tx_seq: TxSequenceNumber,
1607 event_seq: usize,
1608 limit: usize,
1609 descending: bool,
1610 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1611 let seq = self
1612 .get_transaction_seq(digest)?
1613 .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1614 Ok(if descending {
1615 self.tables
1616 .event_order
1617 .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1618 .take_while(|result| {
1619 result
1620 .as_ref()
1621 .map(|((tx, _), _)| tx == &seq)
1622 .unwrap_or(false)
1623 })
1624 .take(limit)
1625 .map(|result| {
1626 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1627 (digest, tx_digest, event_seq, time)
1628 })
1629 })
1630 .collect::<Result<Vec<_>, _>>()?
1631 } else {
1632 self.tables
1633 .event_order
1634 .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1635 .map(|result| result.expect("iterator db error"))
1636 .take_while(|((tx, _), _)| tx == &seq)
1637 .take(limit)
1638 .map(|((_, event_seq), (digest, tx_digest, time))| {
1639 (digest, tx_digest, event_seq, time)
1640 })
1641 .collect()
1642 })
1643 }
1644
1645 #[instrument(skip_all)]
1646 fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1647 index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1648 key: &KeyT,
1649 tx_seq: TxSequenceNumber,
1650 event_seq: usize,
1651 limit: usize,
1652 descending: bool,
1653 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1654 Ok(if descending {
1655 index
1656 .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1657 .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1658 .take(limit)
1659 .map(|result| {
1660 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1661 (digest, tx_digest, event_seq, time)
1662 })
1663 })
1664 .collect::<Result<Vec<_>, _>>()?
1665 } else {
1666 index
1667 .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1668 .map(|result| result.expect("iterator db error"))
1669 .take_while(|((m, _), _)| m == key)
1670 .take(limit)
1671 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1672 (digest, tx_digest, event_seq, time)
1673 })
1674 .collect()
1675 })
1676 }
1677
1678 #[instrument(skip(self))]
1679 pub fn events_by_module_id(
1680 &self,
1681 module: &ModuleId,
1682 tx_seq: TxSequenceNumber,
1683 event_seq: usize,
1684 limit: usize,
1685 descending: bool,
1686 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1687 Self::get_event_from_index(
1688 &self.tables.event_by_move_module,
1689 module,
1690 tx_seq,
1691 event_seq,
1692 limit,
1693 descending,
1694 )
1695 }
1696
1697 #[instrument(skip(self))]
1698 pub fn events_by_move_event_struct_name(
1699 &self,
1700 struct_name: &StructTag,
1701 tx_seq: TxSequenceNumber,
1702 event_seq: usize,
1703 limit: usize,
1704 descending: bool,
1705 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1706 Self::get_event_from_index(
1707 &self.tables.event_by_move_event,
1708 struct_name,
1709 tx_seq,
1710 event_seq,
1711 limit,
1712 descending,
1713 )
1714 }
1715
1716 #[instrument(skip(self))]
1717 pub fn events_by_move_event_module(
1718 &self,
1719 module_id: &ModuleId,
1720 tx_seq: TxSequenceNumber,
1721 event_seq: usize,
1722 limit: usize,
1723 descending: bool,
1724 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1725 Self::get_event_from_index(
1726 &self.tables.event_by_event_module,
1727 module_id,
1728 tx_seq,
1729 event_seq,
1730 limit,
1731 descending,
1732 )
1733 }
1734
1735 #[instrument(skip(self))]
1736 pub fn events_by_sender(
1737 &self,
1738 sender: &SuiAddress,
1739 tx_seq: TxSequenceNumber,
1740 event_seq: usize,
1741 limit: usize,
1742 descending: bool,
1743 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1744 Self::get_event_from_index(
1745 &self.tables.event_by_sender,
1746 sender,
1747 tx_seq,
1748 event_seq,
1749 limit,
1750 descending,
1751 )
1752 }
1753
1754 #[instrument(skip(self))]
1755 pub fn event_iterator(
1756 &self,
1757 start_time: u64,
1758 end_time: u64,
1759 tx_seq: TxSequenceNumber,
1760 event_seq: usize,
1761 limit: usize,
1762 descending: bool,
1763 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1764 Ok(if descending {
1765 self.tables
1766 .event_by_time
1767 .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1768 .take_while(|result| {
1769 result
1770 .as_ref()
1771 .map(|((m, _), _)| m >= &start_time)
1772 .unwrap_or(false)
1773 })
1774 .take(limit)
1775 .map(|result| {
1776 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1777 (digest, tx_digest, event_seq, time)
1778 })
1779 })
1780 .collect::<Result<Vec<_>, _>>()?
1781 } else {
1782 self.tables
1783 .event_by_time
1784 .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1785 .map(|result| result.expect("iterator db error"))
1786 .take_while(|((m, _), _)| m <= &end_time)
1787 .take(limit)
1788 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1789 (digest, tx_digest, event_seq, time)
1790 })
1791 .collect()
1792 })
1793 }
1794
1795 pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1796 match self
1797 .tables
1798 .event_by_time
1799 .reversed_safe_iter_with_bounds(
1800 None,
1801 Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1802 )?
1803 .next()
1804 .transpose()?
1805 {
1806 Some(((_, (watermark, _)), _)) => {
1807 if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1808 info!(
1809 "json rpc index pruning. Watermark is {} with digest {}",
1810 watermark, digest
1811 );
1812 }
1813 self.pruner_watermark.store(watermark, Ordering::Relaxed);
1814 self.tables.pruner_watermark.insert(&(), &watermark)?;
1815 Ok(watermark)
1816 }
1817 None => Ok(0),
1818 }
1819 }
1820
1821 pub fn get_dynamic_fields_iterator(
1822 &self,
1823 object: ObjectID,
1824 cursor: Option<ObjectID>,
1825 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1826 {
1827 Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1828 }
1829
1830 #[instrument(skip(self))]
1831 pub fn get_dynamic_field_object_id(
1832 &self,
1833 object: ObjectID,
1834 name_type: TypeTag,
1835 name_bcs_bytes: &[u8],
1836 ) -> SuiResult<Option<ObjectID>> {
1837 debug!(?object, "get_dynamic_field_object_id");
1838 let dynamic_field_id =
1839 dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1840 |e| {
1841 SuiErrorKind::Unknown(format!(
1842 "Unable to generate dynamic field id. Got error: {e:?}"
1843 ))
1844 },
1845 )?;
1846
1847 if let Some(info) = self
1848 .tables
1849 .dynamic_field_index
1850 .get(&(object, dynamic_field_id))?
1851 {
1852 debug_assert!(
1854 info.object_id == dynamic_field_id
1855 || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1856 );
1857 return Ok(Some(info.object_id));
1858 }
1859
1860 let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1861 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1862 let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1863 object,
1864 &dynamic_object_field_type,
1865 name_bcs_bytes,
1866 )
1867 .map_err(|e| {
1868 SuiErrorKind::Unknown(format!(
1869 "Unable to generate dynamic field id. Got error: {e:?}"
1870 ))
1871 })?;
1872 if let Some(info) = self
1873 .tables
1874 .dynamic_field_index
1875 .get(&(object, dynamic_object_field_id))?
1876 {
1877 return Ok(Some(info.object_id));
1878 }
1879
1880 Ok(None)
1881 }
1882
1883 #[instrument(skip(self))]
1884 pub fn get_owner_objects(
1885 &self,
1886 owner: SuiAddress,
1887 cursor: Option<ObjectID>,
1888 limit: usize,
1889 filter: Option<SuiObjectDataFilter>,
1890 ) -> SuiResult<Vec<ObjectInfo>> {
1891 let cursor = match cursor {
1892 Some(cursor) => cursor,
1893 None => ObjectID::ZERO,
1894 };
1895 Ok(self
1896 .get_owner_objects_iterator(owner, cursor, filter)?
1897 .take(limit)
1898 .collect())
1899 }
1900
1901 pub fn get_address_balance_coin_types_iter(
1902 &self,
1903 owner: SuiAddress,
1904 ) -> impl Iterator<Item = TypeTag> {
1905 let start_key = (owner, TypeTag::Bool);
1906 self.tables()
1907 .address_balances
1908 .safe_iter_with_bounds(Some(start_key), None)
1909 .map(|result| result.expect("iterator db error"))
1910 .take_while(move |(key, _)| key.0 == owner)
1911 .map(|(key, _)| key.1)
1912 }
1913
1914 pub fn get_owned_coins_iterator(
1915 coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1916 owner: SuiAddress,
1917 coin_type_tag: Option<String>,
1918 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1919 let all_coins = coin_type_tag.is_none();
1920 let starting_coin_type =
1921 coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1922 let start_key =
1923 CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1924 Ok(coin_index
1925 .safe_iter_with_bounds(Some(start_key), None)
1926 .map(|result| result.expect("iterator db error"))
1927 .take_while(move |(key, _)| {
1928 if key.owner != owner {
1929 return false;
1930 }
1931 if !all_coins && starting_coin_type != key.coin_type {
1932 return false;
1933 }
1934 true
1935 }))
1936 }
1937
1938 pub fn get_owned_coins_iterator_with_cursor(
1939 &self,
1940 owner: SuiAddress,
1941 cursor: (String, u64, ObjectID),
1942 limit: usize,
1943 one_coin_type_only: bool,
1944 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1945 let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1946 let start_key = CoinIndexKey2::new_from_cursor(
1947 owner,
1948 starting_coin_type.clone(),
1949 inverted_balance,
1950 starting_object_id,
1951 );
1952 Ok(self
1953 .tables
1954 .coin_index_2
1955 .safe_iter_with_bounds(Some(start_key), None)
1956 .map(|result| result.expect("iterator db error"))
1957 .filter(move |(key, _)| key.object_id != starting_object_id)
1958 .enumerate()
1959 .take_while(move |(index, (key, _))| {
1960 if *index >= limit {
1961 return false;
1962 }
1963 if key.owner != owner {
1964 return false;
1965 }
1966 if one_coin_type_only && starting_coin_type != key.coin_type {
1967 return false;
1968 }
1969 true
1970 })
1971 .map(|(_index, (key, info))| (key, info)))
1972 }
1973
1974 pub fn get_owner_objects_iterator(
1977 &self,
1978 owner: SuiAddress,
1979 starting_object_id: ObjectID,
1980 filter: Option<SuiObjectDataFilter>,
1981 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1982 Ok(self
1983 .tables
1984 .owner_index
1985 .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1987 .map(|result| result.expect("iterator db error"))
1988 .skip(usize::from(starting_object_id != ObjectID::ZERO))
1989 .take_while(move |((address_owner, _), _)| address_owner == &owner)
1990 .filter(move |(_, o)| {
1991 if let Some(filter) = filter.as_ref() {
1992 filter.matches(o)
1993 } else {
1994 true
1995 }
1996 })
1997 .map(|(_, object_info)| object_info))
1998 }
1999
2000 pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
2001 let mut batch = self.tables.owner_index.batch();
2002 batch.insert_batch(
2003 &self.tables.owner_index,
2004 object_index_changes.new_owners.into_iter(),
2005 )?;
2006 batch.insert_batch(
2007 &self.tables.dynamic_field_index,
2008 object_index_changes.new_dynamic_fields.into_iter(),
2009 )?;
2010 batch.write()?;
2011 Ok(())
2012 }
2013
2014 pub fn is_empty(&self) -> bool {
2015 self.tables.owner_index.is_empty()
2016 }
2017
2018 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
2019 self.tables
2021 .transactions_from_addr
2022 .checkpoint_db(path)
2023 .map_err(Into::into)
2024 }
2025
2026 #[instrument(skip(self))]
2031 pub fn get_coin_object_balance(
2032 &self,
2033 owner: SuiAddress,
2034 coin_type: TypeTag,
2035 ) -> SuiResult<TotalBalance> {
2036 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2037 let cloned_coin_type = coin_type.clone();
2038 let metrics_cloned = self.metrics.clone();
2039 let coin_index_cloned = self.tables.coin_index_2.clone();
2040 if force_disable_cache {
2041 return Self::get_balance_from_db(
2042 metrics_cloned,
2043 coin_index_cloned,
2044 owner,
2045 cloned_coin_type,
2046 )
2047 .map_err(|e| {
2048 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2049 .into()
2050 });
2051 }
2052
2053 self.metrics.balance_lookup_from_total.inc();
2054
2055 let balance = self
2056 .caches
2057 .per_coin_type_balance
2058 .get(&(owner, coin_type.clone()));
2059 if let Some(balance) = balance {
2060 return balance;
2061 }
2062 let all_balance = self.caches.all_balances.get(&owner.clone());
2064 if let Some(Ok(all_balance)) = all_balance
2065 && let Some(balance) = all_balance.get(&coin_type)
2066 {
2067 return Ok(*balance);
2068 }
2069 let cloned_coin_type = coin_type.clone();
2070 let metrics_cloned = self.metrics.clone();
2071 let coin_index_cloned = self.tables.coin_index_2.clone();
2072 self.caches
2073 .per_coin_type_balance
2074 .get_with((owner, coin_type), move || {
2075 Self::get_balance_from_db(
2076 metrics_cloned,
2077 coin_index_cloned,
2078 owner,
2079 cloned_coin_type,
2080 )
2081 .map_err(|e| {
2082 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
2083 .into()
2084 })
2085 })
2086 }
2087
2088 #[instrument(skip(self))]
2094 pub fn get_all_coin_object_balances(
2095 &self,
2096 owner: SuiAddress,
2097 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2098 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
2099 let metrics_cloned = self.metrics.clone();
2100 let coin_index_cloned = self.tables.coin_index_2.clone();
2101 if force_disable_cache {
2102 return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
2103 .map_err(|e| {
2104 SuiErrorKind::ExecutionError(format!(
2105 "Failed to read all balance from DB: {:?}",
2106 e
2107 ))
2108 .into()
2109 });
2110 }
2111
2112 self.metrics.all_balance_lookup_from_total.inc();
2113 let metrics_cloned = self.metrics.clone();
2114 let coin_index_cloned = self.tables.coin_index_2.clone();
2115 self.caches.all_balances.get_with(owner, move || {
2116 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
2117 SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
2118 .into()
2119 })
2120 })
2121 }
2122
2123 #[instrument(skip_all)]
2125 pub fn get_balance_from_db(
2126 metrics: Arc<IndexStoreMetrics>,
2127 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2128 owner: SuiAddress,
2129 coin_type: TypeTag,
2130 ) -> SuiResult<TotalBalance> {
2131 metrics.balance_lookup_from_db.inc();
2132 let coin_type_str = coin_type.to_string();
2133 let coins =
2134 Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
2135
2136 let mut balance = 0i128;
2137 let mut num_coins = 0;
2138 for (_key, coin_info) in coins {
2139 balance += coin_info.balance as i128;
2140 num_coins += 1;
2141 }
2142 Ok(TotalBalance {
2143 balance,
2144 num_coins,
2145 address_balance: 0,
2146 })
2147 }
2148
2149 #[instrument(skip_all)]
2151 pub fn get_all_balances_from_db(
2152 metrics: Arc<IndexStoreMetrics>,
2153 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
2154 owner: SuiAddress,
2155 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2156 metrics.all_balance_lookup_from_db.inc();
2157 let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
2158 let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
2159 .chunk_by(|(key, _coin)| key.coin_type.clone());
2160 for (coin_type, coins) in &coins {
2161 let mut total_balance = 0i128;
2162 let mut coin_object_count = 0;
2163 for (_, coin_info) in coins {
2164 total_balance += coin_info.balance as i128;
2165 coin_object_count += 1;
2166 }
2167 let coin_type =
2168 TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
2169 SuiErrorKind::ExecutionError(format!(
2170 "Failed to parse event sender address: {:?}",
2171 e
2172 ))
2173 })?));
2174 balances.insert(
2175 coin_type,
2176 TotalBalance {
2177 num_coins: coin_object_count,
2178 balance: total_balance,
2179 address_balance: 0,
2180 },
2181 );
2182 }
2183 Ok(Arc::new(balances))
2184 }
2185
2186 fn invalidate_per_coin_type_cache(
2187 &self,
2188 keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
2189 ) -> SuiResult {
2190 self.caches.per_coin_type_balance.batch_invalidate(keys);
2191 Ok(())
2192 }
2193
2194 fn invalidate_all_balance_cache(
2195 &self,
2196 addresses: impl IntoIterator<Item = SuiAddress>,
2197 ) -> SuiResult {
2198 self.caches.all_balances.batch_invalidate(addresses);
2199 Ok(())
2200 }
2201
2202 fn update_per_coin_type_cache(
2203 &self,
2204 keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
2205 ) -> SuiResult {
2206 self.caches
2207 .per_coin_type_balance
2208 .batch_merge(keys, Self::merge_balance);
2209 Ok(())
2210 }
2211
2212 fn merge_balance(
2213 old_balance: &SuiResult<TotalBalance>,
2214 balance_delta: &SuiResult<TotalBalance>,
2215 ) -> SuiResult<TotalBalance> {
2216 if let Ok(old_balance) = old_balance {
2217 if let Ok(balance_delta) = balance_delta {
2218 Ok(TotalBalance {
2219 balance: old_balance.balance + balance_delta.balance,
2220 num_coins: old_balance.num_coins + balance_delta.num_coins,
2221 address_balance: old_balance.address_balance,
2222 })
2223 } else {
2224 balance_delta.clone()
2225 }
2226 } else {
2227 old_balance.clone()
2228 }
2229 }
2230
2231 fn update_all_balance_cache(
2232 &self,
2233 keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
2234 ) -> SuiResult {
2235 self.caches
2236 .all_balances
2237 .batch_merge(keys, Self::merge_all_balance);
2238 Ok(())
2239 }
2240
2241 fn merge_all_balance(
2242 old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2243 balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
2244 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
2245 if let Ok(old_balance) = old_balance {
2246 if let Ok(balance_delta) = balance_delta {
2247 let mut new_balance = HashMap::new();
2248 for (key, value) in old_balance.iter() {
2249 new_balance.insert(key.clone(), *value);
2250 }
2251 for (key, delta) in balance_delta.iter() {
2252 let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
2253 balance: 0,
2254 num_coins: 0,
2255 address_balance: 0,
2256 });
2257 let new_total = TotalBalance {
2258 balance: old.balance + delta.balance,
2259 num_coins: old.num_coins + delta.num_coins,
2260 address_balance: old.address_balance,
2261 };
2262 new_balance.insert(key.clone(), new_total);
2263 }
2264 Ok(Arc::new(new_balance))
2265 } else {
2266 balance_delta.clone()
2267 }
2268 } else {
2269 old_balance.clone()
2270 }
2271 }
2272}
2273
2274#[cfg(test)]
2275mod tests {
2276 use super::IndexStore;
2277 use super::ObjectIndexChanges;
2278 use move_core_types::account_address::AccountAddress;
2279 use prometheus::Registry;
2280 use std::collections::BTreeMap;
2281 use std::env::temp_dir;
2282 use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
2283 use sui_types::digests::TransactionDigest;
2284 use sui_types::effects::TransactionEvents;
2285 use sui_types::gas_coin::GAS;
2286 use sui_types::object;
2287 use sui_types::object::Owner;
2288
2289 #[tokio::test]
2290 async fn test_index_cache() -> anyhow::Result<()> {
2291 let index_store =
2299 IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
2300 let address: SuiAddress = AccountAddress::random().into();
2301 let mut written_objects = BTreeMap::new();
2302 let mut input_objects = BTreeMap::new();
2303 let mut object_map = BTreeMap::new();
2304
2305 let mut new_objects = vec![];
2306 for _i in 0..10 {
2307 let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
2308 new_objects.push((
2309 (address, object.id()),
2310 ObjectInfo {
2311 object_id: object.id(),
2312 version: object.version(),
2313 digest: object.digest(),
2314 type_: ObjectType::Struct(object.type_().unwrap().clone()),
2315 owner: Owner::AddressOwner(address),
2316 previous_transaction: object.previous_transaction,
2317 },
2318 ));
2319 object_map.insert(object.id(), object.clone());
2320 written_objects.insert(object.data.id(), object);
2321 }
2322 let object_index_changes = ObjectIndexChanges {
2323 deleted_owners: vec![],
2324 deleted_dynamic_fields: vec![],
2325 new_owners: new_objects,
2326 new_dynamic_fields: vec![],
2327 };
2328
2329 let tx_coins = (input_objects.clone(), written_objects.clone());
2330 let seq = index_store.allocate_sequence_number();
2331 let (raw_batch, cache_updates) = index_store.index_tx(
2332 seq,
2333 address,
2334 vec![].into_iter(),
2335 vec![].into_iter(),
2336 vec![].into_iter(),
2337 &TransactionEvents { data: vec![] },
2338 object_index_changes,
2339 &TransactionDigest::random(),
2340 1234,
2341 Some(tx_coins),
2342 vec![],
2343 false,
2344 )?;
2345 let mut db_batch = index_store.new_db_batch();
2347 db_batch.concat(vec![raw_batch]).unwrap();
2348 index_store
2349 .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2350 .unwrap();
2351
2352 let balance_from_db = IndexStore::get_balance_from_db(
2353 index_store.metrics.clone(),
2354 index_store.tables.coin_index_2.clone(),
2355 address,
2356 GAS::type_tag(),
2357 )?;
2358 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2359 assert_eq!(balance, balance_from_db);
2360 assert_eq!(balance.balance, 1000);
2361 assert_eq!(balance.num_coins, 10);
2362
2363 let all_balance = index_store.get_all_coin_object_balances(address)?;
2364 let balance = all_balance.get(&GAS::type_tag()).unwrap();
2365 assert_eq!(*balance, balance_from_db);
2366 assert_eq!(balance.balance, 1000);
2367 assert_eq!(balance.num_coins, 10);
2368
2369 written_objects.clear();
2370 let mut deleted_objects = vec![];
2371 for (id, object) in object_map.iter().take(3) {
2372 deleted_objects.push((address, *id));
2373 input_objects.insert(*id, object.to_owned());
2374 }
2375 let object_index_changes = ObjectIndexChanges {
2376 deleted_owners: deleted_objects.clone(),
2377 deleted_dynamic_fields: vec![],
2378 new_owners: vec![],
2379 new_dynamic_fields: vec![],
2380 };
2381 let tx_coins = (input_objects, written_objects);
2382 let seq = index_store.allocate_sequence_number();
2383 let (raw_batch, cache_updates) = index_store.index_tx(
2384 seq,
2385 address,
2386 vec![].into_iter(),
2387 vec![].into_iter(),
2388 vec![].into_iter(),
2389 &TransactionEvents { data: vec![] },
2390 object_index_changes,
2391 &TransactionDigest::random(),
2392 1234,
2393 Some(tx_coins),
2394 vec![],
2395 false,
2396 )?;
2397 let mut db_batch = index_store.new_db_batch();
2398 db_batch.concat(vec![raw_batch]).unwrap();
2399 index_store
2400 .commit_index_batch(db_batch, vec![cache_updates.into_inner()])
2401 .unwrap();
2402 let balance_from_db = IndexStore::get_balance_from_db(
2403 index_store.metrics.clone(),
2404 index_store.tables.coin_index_2.clone(),
2405 address,
2406 GAS::type_tag(),
2407 )?;
2408 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2409 assert_eq!(balance, balance_from_db);
2410 assert_eq!(balance.balance, 700);
2411 assert_eq!(balance.num_coins, 7);
2412 index_store
2415 .caches
2416 .per_coin_type_balance
2417 .invalidate(&(address, GAS::type_tag()));
2418 let all_balance = index_store.get_all_coin_object_balances(address)?;
2419 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2420 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2421 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2422 assert_eq!(balance, balance_from_db);
2423 assert_eq!(balance.balance, 700);
2424 assert_eq!(balance.num_coins, 7);
2425
2426 Ok(())
2427 }
2428
2429 #[tokio::test]
2430 async fn test_get_transaction_by_move_function() {
2431 use sui_types::base_types::ObjectID;
2432 use typed_store::Map;
2433
2434 let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2435 let db = &index_store.tables.transactions_by_move_function;
2436 db.insert(
2437 &(
2438 ObjectID::new([1; 32]),
2439 "mod".to_string(),
2440 "f".to_string(),
2441 0,
2442 ),
2443 &[0; 32].into(),
2444 )
2445 .unwrap();
2446 db.insert(
2447 &(
2448 ObjectID::new([1; 32]),
2449 "mod".to_string(),
2450 "Z".repeat(128),
2451 0,
2452 ),
2453 &[1; 32].into(),
2454 )
2455 .unwrap();
2456 db.insert(
2457 &(
2458 ObjectID::new([1; 32]),
2459 "mod".to_string(),
2460 "f".repeat(128),
2461 0,
2462 ),
2463 &[2; 32].into(),
2464 )
2465 .unwrap();
2466 db.insert(
2467 &(
2468 ObjectID::new([1; 32]),
2469 "mod".to_string(),
2470 "z".repeat(128),
2471 0,
2472 ),
2473 &[3; 32].into(),
2474 )
2475 .unwrap();
2476
2477 let mut v = index_store
2478 .get_transactions_by_move_function(
2479 ObjectID::new([1; 32]),
2480 Some("mod".to_string()),
2481 None,
2482 None,
2483 None,
2484 false,
2485 )
2486 .unwrap();
2487 let v_rev = index_store
2488 .get_transactions_by_move_function(
2489 ObjectID::new([1; 32]),
2490 Some("mod".to_string()),
2491 None,
2492 None,
2493 None,
2494 true,
2495 )
2496 .unwrap();
2497 v.reverse();
2498 assert_eq!(v, v_rev);
2499 }
2500}