1use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19 register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use typed_store::TypedStoreError;
23use typed_store::rocksdb::compaction_filter::Decision;
24
25use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
26use sui_storage::mutex_table::MutexTable;
27use sui_storage::sharded_lru::ShardedLruCache;
28use sui_types::base_types::{
29 ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
30};
31use sui_types::base_types::{ObjectInfo, ObjectRef};
32use sui_types::digests::TransactionEventsDigest;
33use sui_types::dynamic_field::{self, DynamicFieldInfo};
34use sui_types::effects::TransactionEvents;
35use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
36use sui_types::inner_temporary_store::TxCoins;
37use sui_types::object::{Object, Owner};
38use sui_types::parse_sui_struct_tag;
39use sui_types::storage::error::Error as StorageError;
40use tracing::{debug, info, instrument, trace};
41use typed_store::DBMapUtils;
42use typed_store::rocks::{
43 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
44 read_size_from_env,
45};
46use typed_store::traits::Map;
47
48type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
49
50type OwnerIndexKey = (SuiAddress, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
57pub struct CoinIndexKey2 {
58 pub owner: SuiAddress,
59 pub coin_type: String,
60 pub inverted_balance: u64,
63 pub object_id: ObjectID,
64}
65
66impl CoinIndexKey2 {
67 pub fn new_from_cursor(
68 owner: SuiAddress,
69 coin_type: String,
70 inverted_balance: u64,
71 object_id: ObjectID,
72 ) -> Self {
73 Self {
74 owner,
75 coin_type,
76 inverted_balance,
77 object_id,
78 }
79 }
80
81 pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
82 Self {
83 owner,
84 coin_type,
85 inverted_balance: !balance,
86 object_id,
87 }
88 }
89}
90
91const CURRENT_DB_VERSION: u64 = 0;
92const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
93
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
95struct MetadataInfo {
96 version: u64,
98 column_families: BTreeMap<String, ColumnFamilyInfo>,
103}
104
105#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106struct ColumnFamilyInfo {
107 version: u64,
108}
109
110pub const MAX_TX_RANGE_SIZE: u64 = 4096;
111
112pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
113const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
114const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
115const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
116
117#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
118pub struct TotalBalance {
119 pub balance: i128,
120 pub num_coins: i64,
121}
122
123#[derive(Debug)]
124pub struct ObjectIndexChanges {
125 pub deleted_owners: Vec<OwnerIndexKey>,
126 pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
127 pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
128 pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
129}
130
131#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
132pub struct CoinInfo {
133 pub version: SequenceNumber,
134 pub digest: ObjectDigest,
135 pub balance: u64,
136 pub previous_transaction: TransactionDigest,
137}
138
139impl CoinInfo {
140 pub fn from_object(object: &Object) -> Option<CoinInfo> {
141 object.as_coin_maybe().map(|coin| CoinInfo {
142 version: object.version(),
143 digest: object.digest(),
144 previous_transaction: object.previous_transaction,
145 balance: coin.value(),
146 })
147 }
148}
149
150pub struct IndexStoreMetrics {
151 balance_lookup_from_db: IntCounter,
152 balance_lookup_from_total: IntCounter,
153 all_balance_lookup_from_db: IntCounter,
154 all_balance_lookup_from_total: IntCounter,
155}
156
157impl IndexStoreMetrics {
158 pub fn new(registry: &Registry) -> IndexStoreMetrics {
159 Self {
160 balance_lookup_from_db: register_int_counter_with_registry!(
161 "balance_lookup_from_db",
162 "Total number of balance requests served from cache",
163 registry,
164 )
165 .unwrap(),
166 balance_lookup_from_total: register_int_counter_with_registry!(
167 "balance_lookup_from_total",
168 "Total number of balance requests served ",
169 registry,
170 )
171 .unwrap(),
172 all_balance_lookup_from_db: register_int_counter_with_registry!(
173 "all_balance_lookup_from_db",
174 "Total number of all balance requests served from cache",
175 registry,
176 )
177 .unwrap(),
178 all_balance_lookup_from_total: register_int_counter_with_registry!(
179 "all_balance_lookup_from_total",
180 "Total number of all balance requests served",
181 registry,
182 )
183 .unwrap(),
184 }
185 }
186}
187
188pub struct IndexStoreCaches {
189 per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
190 all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
191 pub locks: MutexTable<SuiAddress>,
192}
193
194#[derive(Default)]
195pub struct IndexStoreCacheUpdates {
196 _locks: Vec<OwnedMutexGuard<()>>,
197 per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
198 all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
199}
200
201#[derive(DBMapUtils)]
202pub struct IndexStoreTables {
203 meta: DBMap<(), MetadataInfo>,
210
211 transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
213
214 transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
216
217 #[deprecated]
219 transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
220
221 #[deprecated]
223 transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
224
225 transactions_by_move_function:
227 DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
228
229 transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
231
232 transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
234
235 owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
240
241 coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
242
243 dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
248
249 event_order: DBMap<EventId, EventIndex>,
250 event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
251 event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
252 event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
253 event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
254 event_by_time: DBMap<(u64, EventId), EventIndex>,
255
256 pruner_watermark: DBMap<(), TxSequenceNumber>,
257}
258
259impl IndexStoreTables {
260 pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
261 &self.owner_index
262 }
263
264 pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
265 &self.coin_index_2
266 }
267
268 #[allow(deprecated)]
269 fn init(&mut self) -> Result<(), StorageError> {
270 let metadata = {
271 match self.meta.get(&()) {
272 Ok(Some(metadata)) => metadata,
273 Ok(None) | Err(_) => MetadataInfo {
274 version: CURRENT_DB_VERSION,
275 column_families: BTreeMap::new(),
276 },
277 }
278 };
279
280 self.meta.insert(&(), &metadata)?;
282
283 Ok(())
284 }
285
286 pub fn get_dynamic_fields_iterator(
287 &self,
288 object: ObjectID,
289 cursor: Option<ObjectID>,
290 ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
291 debug!(?object, "get_dynamic_fields");
292 let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
294 let iter_upper_bound = (object, ObjectID::MAX);
295 self.dynamic_field_index
296 .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
297 .skip(usize::from(cursor.is_some()))
299 .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
300 .map_ok(|((_, c), object_info)| (c, object_info))
301 }
302}
303
304pub struct IndexStore {
305 next_sequence_number: AtomicU64,
306 tables: IndexStoreTables,
307 pub caches: IndexStoreCaches,
308 metrics: Arc<IndexStoreMetrics>,
309 max_type_length: u64,
310 remove_deprecated_tables: bool,
311 pruner_watermark: Arc<AtomicU64>,
312}
313
314struct JsonRpcCompactionMetrics {
315 key_removed: IntCounterVec,
316 key_kept: IntCounterVec,
317 key_error: IntCounterVec,
318}
319
320impl JsonRpcCompactionMetrics {
321 pub fn new(registry: &Registry) -> Arc<Self> {
322 Arc::new(Self {
323 key_removed: register_int_counter_vec_with_registry!(
324 "json_rpc_compaction_filter_key_removed",
325 "Compaction key removed",
326 &["cf"],
327 registry
328 )
329 .unwrap(),
330 key_kept: register_int_counter_vec_with_registry!(
331 "json_rpc_compaction_filter_key_kept",
332 "Compaction key kept",
333 &["cf"],
334 registry
335 )
336 .unwrap(),
337 key_error: register_int_counter_vec_with_registry!(
338 "json_rpc_compaction_filter_key_error",
339 "Compaction error",
340 &["cf"],
341 registry
342 )
343 .unwrap(),
344 })
345 }
346}
347
348fn compaction_filter_config<T: DeserializeOwned>(
349 name: &str,
350 metrics: Arc<JsonRpcCompactionMetrics>,
351 mut db_options: DBOptions,
352 pruner_watermark: Arc<AtomicU64>,
353 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
354 by_key: bool,
355) -> DBOptions {
356 let cf = name.to_string();
357 db_options
358 .options
359 .set_compaction_filter(name, move |_, key, value| {
360 let bytes = if by_key { key } else { value };
361 let deserializer = bincode::DefaultOptions::new()
362 .with_big_endian()
363 .with_fixint_encoding();
364 match deserializer.deserialize(bytes) {
365 Ok(key_data) => {
366 let sequence_number = extractor(key_data);
367 if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
368 metrics.key_removed.with_label_values(&[&cf]).inc();
369 Decision::Remove
370 } else {
371 metrics.key_kept.with_label_values(&[&cf]).inc();
372 Decision::Keep
373 }
374 }
375 Err(_) => {
376 metrics.key_error.with_label_values(&[&cf]).inc();
377 Decision::Keep
378 }
379 }
380 });
381 db_options
382}
383
384fn compaction_filter_config_by_key<T: DeserializeOwned>(
385 name: &str,
386 metrics: Arc<JsonRpcCompactionMetrics>,
387 db_options: DBOptions,
388 pruner_watermark: Arc<AtomicU64>,
389 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
390) -> DBOptions {
391 compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
392}
393
394fn coin_index_table_default_config() -> DBOptions {
395 default_db_options()
396 .optimize_for_write_throughput()
397 .optimize_for_read(
398 read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
399 )
400 .disable_write_throttling()
401}
402
403impl IndexStore {
404 pub fn new_without_init(
405 path: PathBuf,
406 registry: &Registry,
407 max_type_length: Option<u64>,
408 remove_deprecated_tables: bool,
409 ) -> Self {
410 let db_options = default_db_options().disable_write_throttling();
411 let pruner_watermark = Arc::new(AtomicU64::new(0));
412 let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
413 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
414 (
415 "transactions_from_addr".to_string(),
416 compaction_filter_config_by_key(
417 "transactions_from_addr",
418 compaction_metrics.clone(),
419 db_options.clone(),
420 pruner_watermark.clone(),
421 |(_, id): (SuiAddress, TxSequenceNumber)| id,
422 ),
423 ),
424 (
425 "transactions_to_addr".to_string(),
426 compaction_filter_config_by_key(
427 "transactions_to_addr",
428 compaction_metrics.clone(),
429 db_options.clone(),
430 pruner_watermark.clone(),
431 |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
432 ),
433 ),
434 (
435 "transactions_by_move_function".to_string(),
436 compaction_filter_config_by_key(
437 "transactions_by_move_function",
438 compaction_metrics.clone(),
439 db_options.clone(),
440 pruner_watermark.clone(),
441 |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
442 ),
443 ),
444 (
445 "transaction_order".to_string(),
446 compaction_filter_config_by_key(
447 "transaction_order",
448 compaction_metrics.clone(),
449 db_options.clone(),
450 pruner_watermark.clone(),
451 |sequence_number: TxSequenceNumber| sequence_number,
452 ),
453 ),
454 (
455 "transactions_seq".to_string(),
456 compaction_filter_config(
457 "transactions_seq",
458 compaction_metrics.clone(),
459 db_options.clone(),
460 pruner_watermark.clone(),
461 |sequence_number: TxSequenceNumber| sequence_number,
462 false,
463 ),
464 ),
465 (
466 "coin_index_2".to_string(),
467 coin_index_table_default_config(),
468 ),
469 (
470 "event_order".to_string(),
471 compaction_filter_config_by_key(
472 "event_order",
473 compaction_metrics.clone(),
474 db_options.clone(),
475 pruner_watermark.clone(),
476 |event_id: EventId| event_id.0,
477 ),
478 ),
479 (
480 "event_by_move_module".to_string(),
481 compaction_filter_config_by_key(
482 "event_by_move_module",
483 compaction_metrics.clone(),
484 db_options.clone(),
485 pruner_watermark.clone(),
486 |(_, event_id): (ModuleId, EventId)| event_id.0,
487 ),
488 ),
489 (
490 "event_by_event_module".to_string(),
491 compaction_filter_config_by_key(
492 "event_by_event_module",
493 compaction_metrics.clone(),
494 db_options.clone(),
495 pruner_watermark.clone(),
496 |(_, event_id): (ModuleId, EventId)| event_id.0,
497 ),
498 ),
499 (
500 "event_by_sender".to_string(),
501 compaction_filter_config_by_key(
502 "event_by_sender",
503 compaction_metrics.clone(),
504 db_options.clone(),
505 pruner_watermark.clone(),
506 |(_, event_id): (SuiAddress, EventId)| event_id.0,
507 ),
508 ),
509 (
510 "event_by_time".to_string(),
511 compaction_filter_config_by_key(
512 "event_by_time",
513 compaction_metrics.clone(),
514 db_options.clone(),
515 pruner_watermark.clone(),
516 |(_, event_id): (u64, EventId)| event_id.0,
517 ),
518 ),
519 ]));
520 let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
521 path,
522 MetricConf::new("index"),
523 Some(db_options.options),
524 Some(table_options),
525 remove_deprecated_tables,
526 );
527
528 let metrics = IndexStoreMetrics::new(registry);
529 let caches = IndexStoreCaches {
530 per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
531 all_balances: ShardedLruCache::new(1_000_000, 1000),
532 locks: MutexTable::new(128),
533 };
534 let next_sequence_number = tables
535 .transaction_order
536 .reversed_safe_iter_with_bounds(None, None)
537 .expect("failed to initialize indexes")
538 .next()
539 .transpose()
540 .expect("failed to initialize indexes")
541 .map(|(seq, _)| seq + 1)
542 .unwrap_or(0)
543 .into();
544 let pruner_watermark_value = tables
545 .pruner_watermark
546 .get(&())
547 .expect("failed to initialize index tables")
548 .unwrap_or(0);
549 pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
550
551 Self {
552 tables,
553 next_sequence_number,
554 caches,
555 metrics: Arc::new(metrics),
556 max_type_length: max_type_length.unwrap_or(128),
557 remove_deprecated_tables,
558 pruner_watermark,
559 }
560 }
561
562 pub fn new(
563 path: PathBuf,
564 registry: &Registry,
565 max_type_length: Option<u64>,
566 remove_deprecated_tables: bool,
567 ) -> Self {
568 let mut store =
569 Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
570 store.tables.init().unwrap();
571 store
572 }
573
574 pub fn tables(&self) -> &IndexStoreTables {
575 &self.tables
576 }
577
578 #[instrument(skip_all)]
579 pub fn index_coin(
580 &self,
581 digest: &TransactionDigest,
582 batch: &mut DBBatch,
583 object_index_changes: &ObjectIndexChanges,
584 tx_coins: Option<TxCoins>,
585 ) -> SuiResult<IndexStoreCacheUpdates> {
586 if tx_coins.is_none() {
590 return Ok(IndexStoreCacheUpdates::default());
591 }
592 let mut addresses: HashSet<SuiAddress> = HashSet::new();
594 addresses.extend(
595 object_index_changes
596 .deleted_owners
597 .iter()
598 .map(|(owner, _)| *owner),
599 );
600 addresses.extend(
601 object_index_changes
602 .new_owners
603 .iter()
604 .map(|((owner, _), _)| *owner),
605 );
606 let _locks = self.caches.locks.acquire_locks(addresses.into_iter());
607 let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
608 HashMap::new();
609 let (input_coins, written_coins) = tx_coins.unwrap();
611
612 let coin_delete_keys = input_coins
614 .values()
615 .filter_map(|object| {
616 let Owner::AddressOwner(owner) = object.owner() else {
618 return None;
619 };
620
621 let (coin_type, coin) = object
623 .coin_type_maybe()
624 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
625
626 let key = CoinIndexKey2::new(
627 *owner,
628 coin_type.to_string(),
629 coin.balance.value(),
630 object.id(),
631 );
632
633 let map = balance_changes.entry(*owner).or_default();
634 let entry = map.entry(coin_type).or_insert(TotalBalance {
635 num_coins: 0,
636 balance: 0,
637 });
638 entry.num_coins -= 1;
639 entry.balance -= coin.balance.value() as i128;
640
641 Some(key)
642 })
643 .collect::<Vec<_>>();
644 trace!(
645 tx_digset=?digest,
646 "coin_delete_keys: {:?}",
647 coin_delete_keys,
648 );
649 batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
650
651 let coin_add_keys = written_coins
653 .values()
654 .filter_map(|object| {
655 let Owner::AddressOwner(owner) = object.owner() else {
657 return None;
658 };
659
660 let (coin_type, coin) = object
662 .coin_type_maybe()
663 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
664
665 let key = CoinIndexKey2::new(
666 *owner,
667 coin_type.to_string(),
668 coin.balance.value(),
669 object.id(),
670 );
671 let value = CoinInfo {
672 version: object.version(),
673 digest: object.digest(),
674 balance: coin.balance.value(),
675 previous_transaction: object.previous_transaction,
676 };
677 let map = balance_changes.entry(*owner).or_default();
678 let entry = map.entry(coin_type).or_insert(TotalBalance {
679 num_coins: 0,
680 balance: 0,
681 });
682 entry.num_coins += 1;
683 entry.balance += coin.balance.value() as i128;
684
685 Some((key, value))
686 })
687 .collect::<Vec<_>>();
688 trace!(
689 tx_digset=?digest,
690 "coin_add_keys: {:?}",
691 coin_add_keys,
692 );
693
694 batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
695
696 let per_coin_type_balance_changes: Vec<_> = balance_changes
697 .iter()
698 .flat_map(|(address, balance_map)| {
699 balance_map.iter().map(|(type_tag, balance)| {
700 (
701 (*address, type_tag.clone()),
702 Ok::<TotalBalance, SuiError>(*balance),
703 )
704 })
705 })
706 .collect();
707 let all_balance_changes: Vec<_> = balance_changes
708 .into_iter()
709 .map(|(address, balance_map)| {
710 (
711 address,
712 Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
713 )
714 })
715 .collect();
716 let cache_updates = IndexStoreCacheUpdates {
717 _locks,
718 per_coin_type_balance_changes,
719 all_balance_changes,
720 };
721 Ok(cache_updates)
722 }
723
724 #[instrument(skip_all)]
725 pub fn index_tx(
726 &self,
727 sender: SuiAddress,
728 active_inputs: impl Iterator<Item = ObjectID>,
729 mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
730 move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
731 events: &TransactionEvents,
732 object_index_changes: ObjectIndexChanges,
733 digest: &TransactionDigest,
734 timestamp_ms: u64,
735 tx_coins: Option<TxCoins>,
736 ) -> SuiResult<u64> {
737 let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
738 let mut batch = self.tables.transactions_from_addr.batch();
739
740 batch.insert_batch(
741 &self.tables.transaction_order,
742 std::iter::once((sequence, *digest)),
743 )?;
744
745 batch.insert_batch(
746 &self.tables.transactions_seq,
747 std::iter::once((*digest, sequence)),
748 )?;
749
750 batch.insert_batch(
751 &self.tables.transactions_from_addr,
752 std::iter::once(((sender, sequence), *digest)),
753 )?;
754
755 #[allow(deprecated)]
756 if !self.remove_deprecated_tables {
757 batch.insert_batch(
758 &self.tables.transactions_by_input_object_id,
759 active_inputs.map(|id| ((id, sequence), *digest)),
760 )?;
761
762 batch.insert_batch(
763 &self.tables.transactions_by_mutated_object_id,
764 mutated_objects
765 .clone()
766 .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
767 )?;
768 }
769
770 batch.insert_batch(
771 &self.tables.transactions_by_move_function,
772 move_functions
773 .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
774 )?;
775
776 batch.insert_batch(
777 &self.tables.transactions_to_addr,
778 mutated_objects.filter_map(|(_, owner)| {
779 owner
780 .get_address_owner_address()
781 .ok()
782 .map(|addr| ((addr, sequence), digest))
783 }),
784 )?;
785
786 let cache_updates = self.index_coin(digest, &mut batch, &object_index_changes, tx_coins)?;
788
789 batch.delete_batch(
791 &self.tables.owner_index,
792 object_index_changes.deleted_owners.into_iter(),
793 )?;
794 batch.delete_batch(
795 &self.tables.dynamic_field_index,
796 object_index_changes.deleted_dynamic_fields.into_iter(),
797 )?;
798
799 batch.insert_batch(
800 &self.tables.owner_index,
801 object_index_changes.new_owners.into_iter(),
802 )?;
803
804 batch.insert_batch(
805 &self.tables.dynamic_field_index,
806 object_index_changes.new_dynamic_fields.into_iter(),
807 )?;
808
809 let event_digest = events.digest();
811 batch.insert_batch(
812 &self.tables.event_order,
813 events
814 .data
815 .iter()
816 .enumerate()
817 .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
818 )?;
819 batch.insert_batch(
820 &self.tables.event_by_move_module,
821 events
822 .data
823 .iter()
824 .enumerate()
825 .map(|(i, e)| {
826 (
827 i,
828 ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
829 )
830 })
831 .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
832 )?;
833 batch.insert_batch(
834 &self.tables.event_by_sender,
835 events.data.iter().enumerate().map(|(i, e)| {
836 (
837 (e.sender, (sequence, i)),
838 (event_digest, *digest, timestamp_ms),
839 )
840 }),
841 )?;
842 batch.insert_batch(
843 &self.tables.event_by_move_event,
844 events.data.iter().enumerate().map(|(i, e)| {
845 (
846 (e.type_.clone(), (sequence, i)),
847 (event_digest, *digest, timestamp_ms),
848 )
849 }),
850 )?;
851
852 batch.insert_batch(
853 &self.tables.event_by_time,
854 events.data.iter().enumerate().map(|(i, _)| {
855 (
856 (timestamp_ms, (sequence, i)),
857 (event_digest, *digest, timestamp_ms),
858 )
859 }),
860 )?;
861
862 batch.insert_batch(
863 &self.tables.event_by_event_module,
864 events.data.iter().enumerate().map(|(i, e)| {
865 (
866 (
867 ModuleId::new(e.type_.address, e.type_.module.clone()),
868 (sequence, i),
869 ),
870 (event_digest, *digest, timestamp_ms),
871 )
872 }),
873 )?;
874
875 let invalidate_caches =
876 read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
877
878 if invalidate_caches {
879 self.invalidate_per_coin_type_cache(
881 cache_updates
882 .per_coin_type_balance_changes
883 .iter()
884 .map(|x| x.0.clone()),
885 )?;
886 self.invalidate_all_balance_cache(
887 cache_updates.all_balance_changes.iter().map(|x| x.0),
888 )?;
889 }
890
891 batch.write()?;
892
893 if !invalidate_caches {
894 self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)?;
898 self.update_all_balance_cache(cache_updates.all_balance_changes)?;
899 }
900 Ok(sequence)
901 }
902
903 pub fn next_sequence_number(&self) -> TxSequenceNumber {
904 self.next_sequence_number.load(Ordering::SeqCst) + 1
905 }
906
907 #[instrument(skip(self))]
908 pub fn get_transactions(
909 &self,
910 filter: Option<TransactionFilter>,
911 cursor: Option<TransactionDigest>,
912 limit: Option<usize>,
913 reverse: bool,
914 ) -> SuiResult<Vec<TransactionDigest>> {
915 let cursor = if let Some(cursor) = cursor {
917 Some(
918 self.get_transaction_seq(&cursor)?
919 .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
920 )
921 } else {
922 None
923 };
924 match filter {
925 Some(TransactionFilter::MoveFunction {
926 package,
927 module,
928 function,
929 }) => Ok(self.get_transactions_by_move_function(
930 package, module, function, cursor, limit, reverse,
931 )?),
932 Some(TransactionFilter::InputObject(object_id)) => {
933 Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
934 }
935 Some(TransactionFilter::ChangedObject(object_id)) => {
936 Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
937 }
938 Some(TransactionFilter::FromAddress(address)) => {
939 Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
940 }
941 Some(TransactionFilter::ToAddress(address)) => {
942 Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
943 }
944 Some(_) => Err(SuiErrorKind::UserInputError {
947 error: UserInputError::Unsupported(format!("{:?}", filter)),
948 }
949 .into()),
950 None => {
951 if reverse {
952 let iter = self
953 .tables
954 .transaction_order
955 .reversed_safe_iter_with_bounds(
956 None,
957 Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
958 )?
959 .skip(usize::from(cursor.is_some()))
960 .map(|result| result.map(|(_, digest)| digest));
961 if let Some(limit) = limit {
962 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
963 } else {
964 Ok(iter.collect::<Result<Vec<_>, _>>()?)
965 }
966 } else {
967 let iter = self
968 .tables
969 .transaction_order
970 .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
971 .skip(usize::from(cursor.is_some()))
972 .map(|result| result.map(|(_, digest)| digest));
973 if let Some(limit) = limit {
974 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
975 } else {
976 Ok(iter.collect::<Result<Vec<_>, _>>()?)
977 }
978 }
979 }
980 }
981 }
982
983 #[instrument(skip_all)]
984 fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
985 index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
986 key: KeyT,
987 cursor: Option<TxSequenceNumber>,
988 limit: Option<usize>,
989 reverse: bool,
990 ) -> SuiResult<Vec<TransactionDigest>> {
991 Ok(if reverse {
992 let iter = index
993 .reversed_safe_iter_with_bounds(
994 None,
995 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
996 )?
997 .skip(usize::from(cursor.is_some()))
999 .take_while(|result| {
1000 result
1001 .as_ref()
1002 .map(|((id, _), _)| *id == key)
1003 .unwrap_or(false)
1004 })
1005 .map(|result| result.map(|(_, digest)| digest));
1006 if let Some(limit) = limit {
1007 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1008 } else {
1009 iter.collect::<Result<Vec<_>, _>>()?
1010 }
1011 } else {
1012 let iter = index
1013 .safe_iter_with_bounds(
1014 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1015 None,
1016 )
1017 .skip(usize::from(cursor.is_some()))
1019 .map(|result| result.expect("iterator db error"))
1020 .take_while(|((id, _), _)| *id == key)
1021 .map(|(_, digest)| digest);
1022 if let Some(limit) = limit {
1023 iter.take(limit).collect()
1024 } else {
1025 iter.collect()
1026 }
1027 })
1028 }
1029
1030 #[instrument(skip(self))]
1031 pub fn get_transactions_by_input_object(
1032 &self,
1033 input_object: ObjectID,
1034 cursor: Option<TxSequenceNumber>,
1035 limit: Option<usize>,
1036 reverse: bool,
1037 ) -> SuiResult<Vec<TransactionDigest>> {
1038 if self.remove_deprecated_tables {
1039 return Ok(vec![]);
1040 }
1041 #[allow(deprecated)]
1042 Self::get_transactions_from_index(
1043 &self.tables.transactions_by_input_object_id,
1044 input_object,
1045 cursor,
1046 limit,
1047 reverse,
1048 )
1049 }
1050
1051 #[instrument(skip(self))]
1052 pub fn get_transactions_by_mutated_object(
1053 &self,
1054 mutated_object: ObjectID,
1055 cursor: Option<TxSequenceNumber>,
1056 limit: Option<usize>,
1057 reverse: bool,
1058 ) -> SuiResult<Vec<TransactionDigest>> {
1059 if self.remove_deprecated_tables {
1060 return Ok(vec![]);
1061 }
1062 #[allow(deprecated)]
1063 Self::get_transactions_from_index(
1064 &self.tables.transactions_by_mutated_object_id,
1065 mutated_object,
1066 cursor,
1067 limit,
1068 reverse,
1069 )
1070 }
1071
1072 #[instrument(skip(self))]
1073 pub fn get_transactions_from_addr(
1074 &self,
1075 addr: SuiAddress,
1076 cursor: Option<TxSequenceNumber>,
1077 limit: Option<usize>,
1078 reverse: bool,
1079 ) -> SuiResult<Vec<TransactionDigest>> {
1080 Self::get_transactions_from_index(
1081 &self.tables.transactions_from_addr,
1082 addr,
1083 cursor,
1084 limit,
1085 reverse,
1086 )
1087 }
1088
1089 #[instrument(skip(self))]
1090 pub fn get_transactions_by_move_function(
1091 &self,
1092 package: ObjectID,
1093 module: Option<String>,
1094 function: Option<String>,
1095 cursor: Option<TxSequenceNumber>,
1096 limit: Option<usize>,
1097 reverse: bool,
1098 ) -> SuiResult<Vec<TransactionDigest>> {
1099 if function.is_some() && module.is_none() {
1101 return Err(SuiErrorKind::UserInputError {
1102 error: UserInputError::MoveFunctionInputError(
1103 "Cannot supply function without supplying module".to_string(),
1104 ),
1105 }
1106 .into());
1107 }
1108
1109 if cursor.is_some() && (module.is_none() || function.is_none()) {
1111 return Err(SuiErrorKind::UserInputError {
1112 error: UserInputError::MoveFunctionInputError(
1113 "Cannot supply cursor without supplying module and function".to_string(),
1114 ),
1115 }
1116 .into());
1117 }
1118
1119 let cursor_val = cursor.unwrap_or(if reverse {
1120 TxSequenceNumber::MAX
1121 } else {
1122 TxSequenceNumber::MIN
1123 });
1124
1125 let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1126 let module_val = module.clone().unwrap_or(if reverse {
1127 max_string.clone()
1128 } else {
1129 "".to_string()
1130 });
1131
1132 let function_val =
1133 function
1134 .clone()
1135 .unwrap_or(if reverse { max_string } else { "".to_string() });
1136
1137 let key = (package, module_val, function_val, cursor_val);
1138 Ok(if reverse {
1139 let iter = self
1140 .tables
1141 .transactions_by_move_function
1142 .reversed_safe_iter_with_bounds(None, Some(key))?
1143 .skip(usize::from(cursor.is_some()))
1145 .take_while(|result| {
1146 result
1147 .as_ref()
1148 .map(|((id, m, f, _), _)| {
1149 *id == package
1150 && module.as_ref().map(|x| x == m).unwrap_or(true)
1151 && function.as_ref().map(|x| x == f).unwrap_or(true)
1152 })
1153 .unwrap_or(false)
1154 })
1155 .map(|result| result.map(|(_, digest)| digest));
1156 if let Some(limit) = limit {
1157 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1158 } else {
1159 iter.collect::<Result<Vec<_>, _>>()?
1160 }
1161 } else {
1162 let iter = self
1163 .tables
1164 .transactions_by_move_function
1165 .safe_iter_with_bounds(Some(key), None)
1166 .map(|result| result.expect("iterator db error"))
1167 .skip(usize::from(cursor.is_some()))
1169 .take_while(|((id, m, f, _), _)| {
1170 *id == package
1171 && module.as_ref().map(|x| x == m).unwrap_or(true)
1172 && function.as_ref().map(|x| x == f).unwrap_or(true)
1173 })
1174 .map(|(_, digest)| digest);
1175 if let Some(limit) = limit {
1176 iter.take(limit).collect()
1177 } else {
1178 iter.collect()
1179 }
1180 })
1181 }
1182
1183 #[instrument(skip(self))]
1184 pub fn get_transactions_to_addr(
1185 &self,
1186 addr: SuiAddress,
1187 cursor: Option<TxSequenceNumber>,
1188 limit: Option<usize>,
1189 reverse: bool,
1190 ) -> SuiResult<Vec<TransactionDigest>> {
1191 Self::get_transactions_from_index(
1192 &self.tables.transactions_to_addr,
1193 addr,
1194 cursor,
1195 limit,
1196 reverse,
1197 )
1198 }
1199
1200 #[instrument(skip(self))]
1201 pub fn get_transaction_seq(
1202 &self,
1203 digest: &TransactionDigest,
1204 ) -> SuiResult<Option<TxSequenceNumber>> {
1205 Ok(self.tables.transactions_seq.get(digest)?)
1206 }
1207
1208 #[instrument(skip(self))]
1209 pub fn all_events(
1210 &self,
1211 tx_seq: TxSequenceNumber,
1212 event_seq: usize,
1213 limit: usize,
1214 descending: bool,
1215 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1216 Ok(if descending {
1217 self.tables
1218 .event_order
1219 .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1220 .take(limit)
1221 .map(|result| {
1222 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1223 (digest, tx_digest, event_seq, time)
1224 })
1225 })
1226 .collect::<Result<Vec<_>, _>>()?
1227 } else {
1228 self.tables
1229 .event_order
1230 .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1231 .take(limit)
1232 .map(|result| {
1233 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1234 (digest, tx_digest, event_seq, time)
1235 })
1236 })
1237 .collect::<Result<Vec<_>, _>>()?
1238 })
1239 }
1240
1241 #[instrument(skip(self))]
1242 pub fn events_by_transaction(
1243 &self,
1244 digest: &TransactionDigest,
1245 tx_seq: TxSequenceNumber,
1246 event_seq: usize,
1247 limit: usize,
1248 descending: bool,
1249 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1250 let seq = self
1251 .get_transaction_seq(digest)?
1252 .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1253 Ok(if descending {
1254 self.tables
1255 .event_order
1256 .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1257 .take_while(|result| {
1258 result
1259 .as_ref()
1260 .map(|((tx, _), _)| tx == &seq)
1261 .unwrap_or(false)
1262 })
1263 .take(limit)
1264 .map(|result| {
1265 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1266 (digest, tx_digest, event_seq, time)
1267 })
1268 })
1269 .collect::<Result<Vec<_>, _>>()?
1270 } else {
1271 self.tables
1272 .event_order
1273 .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1274 .map(|result| result.expect("iterator db error"))
1275 .take_while(|((tx, _), _)| tx == &seq)
1276 .take(limit)
1277 .map(|((_, event_seq), (digest, tx_digest, time))| {
1278 (digest, tx_digest, event_seq, time)
1279 })
1280 .collect()
1281 })
1282 }
1283
1284 #[instrument(skip_all)]
1285 fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1286 index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1287 key: &KeyT,
1288 tx_seq: TxSequenceNumber,
1289 event_seq: usize,
1290 limit: usize,
1291 descending: bool,
1292 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1293 Ok(if descending {
1294 index
1295 .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1296 .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1297 .take(limit)
1298 .map(|result| {
1299 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1300 (digest, tx_digest, event_seq, time)
1301 })
1302 })
1303 .collect::<Result<Vec<_>, _>>()?
1304 } else {
1305 index
1306 .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1307 .map(|result| result.expect("iterator db error"))
1308 .take_while(|((m, _), _)| m == key)
1309 .take(limit)
1310 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1311 (digest, tx_digest, event_seq, time)
1312 })
1313 .collect()
1314 })
1315 }
1316
1317 #[instrument(skip(self))]
1318 pub fn events_by_module_id(
1319 &self,
1320 module: &ModuleId,
1321 tx_seq: TxSequenceNumber,
1322 event_seq: usize,
1323 limit: usize,
1324 descending: bool,
1325 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1326 Self::get_event_from_index(
1327 &self.tables.event_by_move_module,
1328 module,
1329 tx_seq,
1330 event_seq,
1331 limit,
1332 descending,
1333 )
1334 }
1335
1336 #[instrument(skip(self))]
1337 pub fn events_by_move_event_struct_name(
1338 &self,
1339 struct_name: &StructTag,
1340 tx_seq: TxSequenceNumber,
1341 event_seq: usize,
1342 limit: usize,
1343 descending: bool,
1344 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1345 Self::get_event_from_index(
1346 &self.tables.event_by_move_event,
1347 struct_name,
1348 tx_seq,
1349 event_seq,
1350 limit,
1351 descending,
1352 )
1353 }
1354
1355 #[instrument(skip(self))]
1356 pub fn events_by_move_event_module(
1357 &self,
1358 module_id: &ModuleId,
1359 tx_seq: TxSequenceNumber,
1360 event_seq: usize,
1361 limit: usize,
1362 descending: bool,
1363 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1364 Self::get_event_from_index(
1365 &self.tables.event_by_event_module,
1366 module_id,
1367 tx_seq,
1368 event_seq,
1369 limit,
1370 descending,
1371 )
1372 }
1373
1374 #[instrument(skip(self))]
1375 pub fn events_by_sender(
1376 &self,
1377 sender: &SuiAddress,
1378 tx_seq: TxSequenceNumber,
1379 event_seq: usize,
1380 limit: usize,
1381 descending: bool,
1382 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1383 Self::get_event_from_index(
1384 &self.tables.event_by_sender,
1385 sender,
1386 tx_seq,
1387 event_seq,
1388 limit,
1389 descending,
1390 )
1391 }
1392
1393 #[instrument(skip(self))]
1394 pub fn event_iterator(
1395 &self,
1396 start_time: u64,
1397 end_time: u64,
1398 tx_seq: TxSequenceNumber,
1399 event_seq: usize,
1400 limit: usize,
1401 descending: bool,
1402 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1403 Ok(if descending {
1404 self.tables
1405 .event_by_time
1406 .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1407 .take_while(|result| {
1408 result
1409 .as_ref()
1410 .map(|((m, _), _)| m >= &start_time)
1411 .unwrap_or(false)
1412 })
1413 .take(limit)
1414 .map(|result| {
1415 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1416 (digest, tx_digest, event_seq, time)
1417 })
1418 })
1419 .collect::<Result<Vec<_>, _>>()?
1420 } else {
1421 self.tables
1422 .event_by_time
1423 .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1424 .map(|result| result.expect("iterator db error"))
1425 .take_while(|((m, _), _)| m <= &end_time)
1426 .take(limit)
1427 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1428 (digest, tx_digest, event_seq, time)
1429 })
1430 .collect()
1431 })
1432 }
1433
1434 pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1435 match self
1436 .tables
1437 .event_by_time
1438 .reversed_safe_iter_with_bounds(
1439 None,
1440 Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1441 )?
1442 .next()
1443 .transpose()?
1444 {
1445 Some(((_, (watermark, _)), _)) => {
1446 if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1447 info!(
1448 "json rpc index pruning. Watermark is {} with digest {}",
1449 watermark, digest
1450 );
1451 }
1452 self.pruner_watermark.store(watermark, Ordering::Relaxed);
1453 self.tables.pruner_watermark.insert(&(), &watermark)?;
1454 Ok(watermark)
1455 }
1456 None => Ok(0),
1457 }
1458 }
1459
1460 pub fn get_dynamic_fields_iterator(
1461 &self,
1462 object: ObjectID,
1463 cursor: Option<ObjectID>,
1464 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1465 {
1466 Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1467 }
1468
1469 #[instrument(skip(self))]
1470 pub fn get_dynamic_field_object_id(
1471 &self,
1472 object: ObjectID,
1473 name_type: TypeTag,
1474 name_bcs_bytes: &[u8],
1475 ) -> SuiResult<Option<ObjectID>> {
1476 debug!(?object, "get_dynamic_field_object_id");
1477 let dynamic_field_id =
1478 dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1479 |e| {
1480 SuiErrorKind::Unknown(format!(
1481 "Unable to generate dynamic field id. Got error: {e:?}"
1482 ))
1483 },
1484 )?;
1485
1486 if let Some(info) = self
1487 .tables
1488 .dynamic_field_index
1489 .get(&(object, dynamic_field_id))?
1490 {
1491 debug_assert!(
1493 info.object_id == dynamic_field_id
1494 || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1495 );
1496 return Ok(Some(info.object_id));
1497 }
1498
1499 let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1500 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1501 let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1502 object,
1503 &dynamic_object_field_type,
1504 name_bcs_bytes,
1505 )
1506 .map_err(|e| {
1507 SuiErrorKind::Unknown(format!(
1508 "Unable to generate dynamic field id. Got error: {e:?}"
1509 ))
1510 })?;
1511 if let Some(info) = self
1512 .tables
1513 .dynamic_field_index
1514 .get(&(object, dynamic_object_field_id))?
1515 {
1516 return Ok(Some(info.object_id));
1517 }
1518
1519 Ok(None)
1520 }
1521
1522 #[instrument(skip(self))]
1523 pub fn get_owner_objects(
1524 &self,
1525 owner: SuiAddress,
1526 cursor: Option<ObjectID>,
1527 limit: usize,
1528 filter: Option<SuiObjectDataFilter>,
1529 ) -> SuiResult<Vec<ObjectInfo>> {
1530 let cursor = match cursor {
1531 Some(cursor) => cursor,
1532 None => ObjectID::ZERO,
1533 };
1534 Ok(self
1535 .get_owner_objects_iterator(owner, cursor, filter)?
1536 .take(limit)
1537 .collect())
1538 }
1539
1540 pub fn get_owned_coins_iterator(
1541 coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1542 owner: SuiAddress,
1543 coin_type_tag: Option<String>,
1544 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1545 let all_coins = coin_type_tag.is_none();
1546 let starting_coin_type =
1547 coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1548 let start_key =
1549 CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1550 Ok(coin_index
1551 .safe_iter_with_bounds(Some(start_key), None)
1552 .map(|result| result.expect("iterator db error"))
1553 .take_while(move |(key, _)| {
1554 if key.owner != owner {
1555 return false;
1556 }
1557 if !all_coins && starting_coin_type != key.coin_type {
1558 return false;
1559 }
1560 true
1561 }))
1562 }
1563
1564 pub fn get_owned_coins_iterator_with_cursor(
1565 &self,
1566 owner: SuiAddress,
1567 cursor: (String, u64, ObjectID),
1568 limit: usize,
1569 one_coin_type_only: bool,
1570 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1571 let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1572 let start_key = CoinIndexKey2::new_from_cursor(
1573 owner,
1574 starting_coin_type.clone(),
1575 inverted_balance,
1576 starting_object_id,
1577 );
1578 Ok(self
1579 .tables
1580 .coin_index_2
1581 .safe_iter_with_bounds(Some(start_key), None)
1582 .map(|result| result.expect("iterator db error"))
1583 .filter(move |(key, _)| key.object_id != starting_object_id)
1584 .enumerate()
1585 .take_while(move |(index, (key, _))| {
1586 if *index >= limit {
1587 return false;
1588 }
1589 if key.owner != owner {
1590 return false;
1591 }
1592 if one_coin_type_only && starting_coin_type != key.coin_type {
1593 return false;
1594 }
1595 true
1596 })
1597 .map(|(_index, (key, info))| (key, info)))
1598 }
1599
1600 pub fn get_owner_objects_iterator(
1603 &self,
1604 owner: SuiAddress,
1605 starting_object_id: ObjectID,
1606 filter: Option<SuiObjectDataFilter>,
1607 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1608 Ok(self
1609 .tables
1610 .owner_index
1611 .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1613 .map(|result| result.expect("iterator db error"))
1614 .skip(usize::from(starting_object_id != ObjectID::ZERO))
1615 .take_while(move |((address_owner, _), _)| address_owner == &owner)
1616 .filter(move |(_, o)| {
1617 if let Some(filter) = filter.as_ref() {
1618 filter.matches(o)
1619 } else {
1620 true
1621 }
1622 })
1623 .map(|(_, object_info)| object_info))
1624 }
1625
1626 pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
1627 let mut batch = self.tables.owner_index.batch();
1628 batch.insert_batch(
1629 &self.tables.owner_index,
1630 object_index_changes.new_owners.into_iter(),
1631 )?;
1632 batch.insert_batch(
1633 &self.tables.dynamic_field_index,
1634 object_index_changes.new_dynamic_fields.into_iter(),
1635 )?;
1636 batch.write()?;
1637 Ok(())
1638 }
1639
1640 pub fn is_empty(&self) -> bool {
1641 self.tables.owner_index.is_empty()
1642 }
1643
1644 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1645 self.tables
1647 .transactions_from_addr
1648 .checkpoint_db(path)
1649 .map_err(Into::into)
1650 }
1651
1652 #[instrument(skip(self))]
1657 pub fn get_coin_object_balance(
1658 &self,
1659 owner: SuiAddress,
1660 coin_type: TypeTag,
1661 ) -> SuiResult<TotalBalance> {
1662 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1663 let cloned_coin_type = coin_type.clone();
1664 let metrics_cloned = self.metrics.clone();
1665 let coin_index_cloned = self.tables.coin_index_2.clone();
1666 if force_disable_cache {
1667 return Self::get_balance_from_db(
1668 metrics_cloned,
1669 coin_index_cloned,
1670 owner,
1671 cloned_coin_type,
1672 )
1673 .map_err(|e| {
1674 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1675 .into()
1676 });
1677 }
1678
1679 self.metrics.balance_lookup_from_total.inc();
1680
1681 let balance = self
1682 .caches
1683 .per_coin_type_balance
1684 .get(&(owner, coin_type.clone()));
1685 if let Some(balance) = balance {
1686 return balance;
1687 }
1688 let all_balance = self.caches.all_balances.get(&owner.clone());
1690 if let Some(Ok(all_balance)) = all_balance
1691 && let Some(balance) = all_balance.get(&coin_type)
1692 {
1693 return Ok(*balance);
1694 }
1695 let cloned_coin_type = coin_type.clone();
1696 let metrics_cloned = self.metrics.clone();
1697 let coin_index_cloned = self.tables.coin_index_2.clone();
1698 self.caches
1699 .per_coin_type_balance
1700 .get_with((owner, coin_type), move || {
1701 Self::get_balance_from_db(
1702 metrics_cloned,
1703 coin_index_cloned,
1704 owner,
1705 cloned_coin_type,
1706 )
1707 .map_err(|e| {
1708 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1709 .into()
1710 })
1711 })
1712 }
1713
1714 #[instrument(skip(self))]
1720 pub fn get_all_coin_object_balances(
1721 &self,
1722 owner: SuiAddress,
1723 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1724 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1725 let metrics_cloned = self.metrics.clone();
1726 let coin_index_cloned = self.tables.coin_index_2.clone();
1727 if force_disable_cache {
1728 return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1729 .map_err(|e| {
1730 SuiErrorKind::ExecutionError(format!(
1731 "Failed to read all balance from DB: {:?}",
1732 e
1733 ))
1734 .into()
1735 });
1736 }
1737
1738 self.metrics.all_balance_lookup_from_total.inc();
1739 let metrics_cloned = self.metrics.clone();
1740 let coin_index_cloned = self.tables.coin_index_2.clone();
1741 self.caches.all_balances.get_with(owner, move || {
1742 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
1743 SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
1744 .into()
1745 })
1746 })
1747 }
1748
1749 #[instrument(skip_all)]
1751 pub fn get_balance_from_db(
1752 metrics: Arc<IndexStoreMetrics>,
1753 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1754 owner: SuiAddress,
1755 coin_type: TypeTag,
1756 ) -> SuiResult<TotalBalance> {
1757 metrics.balance_lookup_from_db.inc();
1758 let coin_type_str = coin_type.to_string();
1759 let coins =
1760 Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
1761
1762 let mut balance = 0i128;
1763 let mut num_coins = 0;
1764 for (_key, coin_info) in coins {
1765 balance += coin_info.balance as i128;
1766 num_coins += 1;
1767 }
1768 Ok(TotalBalance { balance, num_coins })
1769 }
1770
1771 #[instrument(skip_all)]
1773 pub fn get_all_balances_from_db(
1774 metrics: Arc<IndexStoreMetrics>,
1775 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1776 owner: SuiAddress,
1777 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1778 metrics.all_balance_lookup_from_db.inc();
1779 let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1780 let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1781 .chunk_by(|(key, _coin)| key.coin_type.clone());
1782 for (coin_type, coins) in &coins {
1783 let mut total_balance = 0i128;
1784 let mut coin_object_count = 0;
1785 for (_, coin_info) in coins {
1786 total_balance += coin_info.balance as i128;
1787 coin_object_count += 1;
1788 }
1789 let coin_type =
1790 TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
1791 SuiErrorKind::ExecutionError(format!(
1792 "Failed to parse event sender address: {:?}",
1793 e
1794 ))
1795 })?));
1796 balances.insert(
1797 coin_type,
1798 TotalBalance {
1799 num_coins: coin_object_count,
1800 balance: total_balance,
1801 },
1802 );
1803 }
1804 Ok(Arc::new(balances))
1805 }
1806
1807 fn invalidate_per_coin_type_cache(
1808 &self,
1809 keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
1810 ) -> SuiResult {
1811 self.caches.per_coin_type_balance.batch_invalidate(keys);
1812 Ok(())
1813 }
1814
1815 fn invalidate_all_balance_cache(
1816 &self,
1817 addresses: impl IntoIterator<Item = SuiAddress>,
1818 ) -> SuiResult {
1819 self.caches.all_balances.batch_invalidate(addresses);
1820 Ok(())
1821 }
1822
1823 fn update_per_coin_type_cache(
1824 &self,
1825 keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
1826 ) -> SuiResult {
1827 self.caches
1828 .per_coin_type_balance
1829 .batch_merge(keys, Self::merge_balance);
1830 Ok(())
1831 }
1832
1833 fn merge_balance(
1834 old_balance: &SuiResult<TotalBalance>,
1835 balance_delta: &SuiResult<TotalBalance>,
1836 ) -> SuiResult<TotalBalance> {
1837 if let Ok(old_balance) = old_balance {
1838 if let Ok(balance_delta) = balance_delta {
1839 Ok(TotalBalance {
1840 balance: old_balance.balance + balance_delta.balance,
1841 num_coins: old_balance.num_coins + balance_delta.num_coins,
1842 })
1843 } else {
1844 balance_delta.clone()
1845 }
1846 } else {
1847 old_balance.clone()
1848 }
1849 }
1850
1851 fn update_all_balance_cache(
1852 &self,
1853 keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1854 ) -> SuiResult {
1855 self.caches
1856 .all_balances
1857 .batch_merge(keys, Self::merge_all_balance);
1858 Ok(())
1859 }
1860
1861 fn merge_all_balance(
1862 old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1863 balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1864 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1865 if let Ok(old_balance) = old_balance {
1866 if let Ok(balance_delta) = balance_delta {
1867 let mut new_balance = HashMap::new();
1868 for (key, value) in old_balance.iter() {
1869 new_balance.insert(key.clone(), *value);
1870 }
1871 for (key, delta) in balance_delta.iter() {
1872 let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1873 balance: 0,
1874 num_coins: 0,
1875 });
1876 let new_total = TotalBalance {
1877 balance: old.balance + delta.balance,
1878 num_coins: old.num_coins + delta.num_coins,
1879 };
1880 new_balance.insert(key.clone(), new_total);
1881 }
1882 Ok(Arc::new(new_balance))
1883 } else {
1884 balance_delta.clone()
1885 }
1886 } else {
1887 old_balance.clone()
1888 }
1889 }
1890}
1891
1892#[cfg(test)]
1893mod tests {
1894 use super::IndexStore;
1895 use super::ObjectIndexChanges;
1896 use move_core_types::account_address::AccountAddress;
1897 use prometheus::Registry;
1898 use std::collections::BTreeMap;
1899 use std::env::temp_dir;
1900 use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
1901 use sui_types::digests::TransactionDigest;
1902 use sui_types::effects::TransactionEvents;
1903 use sui_types::gas_coin::GAS;
1904 use sui_types::object;
1905 use sui_types::object::Owner;
1906
1907 #[tokio::test]
1908 async fn test_index_cache() -> anyhow::Result<()> {
1909 let index_store =
1917 IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
1918 let address: SuiAddress = AccountAddress::random().into();
1919 let mut written_objects = BTreeMap::new();
1920 let mut input_objects = BTreeMap::new();
1921 let mut object_map = BTreeMap::new();
1922
1923 let mut new_objects = vec![];
1924 for _i in 0..10 {
1925 let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1926 new_objects.push((
1927 (address, object.id()),
1928 ObjectInfo {
1929 object_id: object.id(),
1930 version: object.version(),
1931 digest: object.digest(),
1932 type_: ObjectType::Struct(object.type_().unwrap().clone()),
1933 owner: Owner::AddressOwner(address),
1934 previous_transaction: object.previous_transaction,
1935 },
1936 ));
1937 object_map.insert(object.id(), object.clone());
1938 written_objects.insert(object.data.id(), object);
1939 }
1940 let object_index_changes = ObjectIndexChanges {
1941 deleted_owners: vec![],
1942 deleted_dynamic_fields: vec![],
1943 new_owners: new_objects,
1944 new_dynamic_fields: vec![],
1945 };
1946
1947 let tx_coins = (input_objects.clone(), written_objects.clone());
1948 index_store.index_tx(
1949 address,
1950 vec![].into_iter(),
1951 vec![].into_iter(),
1952 vec![].into_iter(),
1953 &TransactionEvents { data: vec![] },
1954 object_index_changes,
1955 &TransactionDigest::random(),
1956 1234,
1957 Some(tx_coins),
1958 )?;
1959
1960 let balance_from_db = IndexStore::get_balance_from_db(
1961 index_store.metrics.clone(),
1962 index_store.tables.coin_index_2.clone(),
1963 address,
1964 GAS::type_tag(),
1965 )?;
1966 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
1967 assert_eq!(balance, balance_from_db);
1968 assert_eq!(balance.balance, 1000);
1969 assert_eq!(balance.num_coins, 10);
1970
1971 let all_balance = index_store.get_all_coin_object_balances(address)?;
1972 let balance = all_balance.get(&GAS::type_tag()).unwrap();
1973 assert_eq!(*balance, balance_from_db);
1974 assert_eq!(balance.balance, 1000);
1975 assert_eq!(balance.num_coins, 10);
1976
1977 written_objects.clear();
1978 let mut deleted_objects = vec![];
1979 for (id, object) in object_map.iter().take(3) {
1980 deleted_objects.push((address, *id));
1981 input_objects.insert(*id, object.to_owned());
1982 }
1983 let object_index_changes = ObjectIndexChanges {
1984 deleted_owners: deleted_objects.clone(),
1985 deleted_dynamic_fields: vec![],
1986 new_owners: vec![],
1987 new_dynamic_fields: vec![],
1988 };
1989 let tx_coins = (input_objects, written_objects);
1990 index_store.index_tx(
1991 address,
1992 vec![].into_iter(),
1993 vec![].into_iter(),
1994 vec![].into_iter(),
1995 &TransactionEvents { data: vec![] },
1996 object_index_changes,
1997 &TransactionDigest::random(),
1998 1234,
1999 Some(tx_coins),
2000 )?;
2001 let balance_from_db = IndexStore::get_balance_from_db(
2002 index_store.metrics.clone(),
2003 index_store.tables.coin_index_2.clone(),
2004 address,
2005 GAS::type_tag(),
2006 )?;
2007 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2008 assert_eq!(balance, balance_from_db);
2009 assert_eq!(balance.balance, 700);
2010 assert_eq!(balance.num_coins, 7);
2011 index_store
2014 .caches
2015 .per_coin_type_balance
2016 .invalidate(&(address, GAS::type_tag()));
2017 let all_balance = index_store.get_all_coin_object_balances(address)?;
2018 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2019 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2020 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2021 assert_eq!(balance, balance_from_db);
2022 assert_eq!(balance.balance, 700);
2023 assert_eq!(balance.num_coins, 7);
2024
2025 Ok(())
2026 }
2027
2028 #[tokio::test]
2029 async fn test_get_transaction_by_move_function() {
2030 use sui_types::base_types::ObjectID;
2031 use typed_store::Map;
2032
2033 let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2034 let db = &index_store.tables.transactions_by_move_function;
2035 db.insert(
2036 &(
2037 ObjectID::new([1; 32]),
2038 "mod".to_string(),
2039 "f".to_string(),
2040 0,
2041 ),
2042 &[0; 32].into(),
2043 )
2044 .unwrap();
2045 db.insert(
2046 &(
2047 ObjectID::new([1; 32]),
2048 "mod".to_string(),
2049 "Z".repeat(128),
2050 0,
2051 ),
2052 &[1; 32].into(),
2053 )
2054 .unwrap();
2055 db.insert(
2056 &(
2057 ObjectID::new([1; 32]),
2058 "mod".to_string(),
2059 "f".repeat(128),
2060 0,
2061 ),
2062 &[2; 32].into(),
2063 )
2064 .unwrap();
2065 db.insert(
2066 &(
2067 ObjectID::new([1; 32]),
2068 "mod".to_string(),
2069 "z".repeat(128),
2070 0,
2071 ),
2072 &[3; 32].into(),
2073 )
2074 .unwrap();
2075
2076 let mut v = index_store
2077 .get_transactions_by_move_function(
2078 ObjectID::new([1; 32]),
2079 Some("mod".to_string()),
2080 None,
2081 None,
2082 None,
2083 false,
2084 )
2085 .unwrap();
2086 let v_rev = index_store
2087 .get_transactions_by_move_function(
2088 ObjectID::new([1; 32]),
2089 Some("mod".to_string()),
2090 None,
2091 None,
2092 None,
2093 true,
2094 )
2095 .unwrap();
2096 v.reverse();
2097 assert_eq!(v, v_rev);
2098 }
2099}