1use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19 register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use typed_store::TypedStoreError;
23use typed_store::rocksdb::compaction_filter::Decision;
24
25use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
26use sui_storage::mutex_table::MutexTable;
27use sui_storage::sharded_lru::ShardedLruCache;
28use sui_types::base_types::{
29 ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
30};
31use sui_types::base_types::{ObjectInfo, ObjectRef};
32use sui_types::digests::TransactionEventsDigest;
33use sui_types::dynamic_field::{self, DynamicFieldInfo};
34use sui_types::effects::TransactionEvents;
35use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
36use sui_types::inner_temporary_store::TxCoins;
37use sui_types::object::{Object, Owner};
38use sui_types::parse_sui_struct_tag;
39use sui_types::storage::error::Error as StorageError;
40use tracing::{debug, info, instrument, trace};
41use typed_store::DBMapUtils;
42use typed_store::rocks::{
43 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
44 read_size_from_env,
45};
46use typed_store::traits::Map;
47
48type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
49
50type OwnerIndexKey = (SuiAddress, ObjectID);
51type DynamicFieldKey = (ObjectID, ObjectID);
52type EventId = (TxSequenceNumber, usize);
53type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
54type AllBalance = HashMap<TypeTag, TotalBalance>;
55
56#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
57pub struct CoinIndexKey2 {
58 pub owner: SuiAddress,
59 pub coin_type: String,
60 pub inverted_balance: u64,
63 pub object_id: ObjectID,
64}
65
66impl CoinIndexKey2 {
67 pub fn new_from_cursor(
68 owner: SuiAddress,
69 coin_type: String,
70 inverted_balance: u64,
71 object_id: ObjectID,
72 ) -> Self {
73 Self {
74 owner,
75 coin_type,
76 inverted_balance,
77 object_id,
78 }
79 }
80
81 pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
82 Self {
83 owner,
84 coin_type,
85 inverted_balance: !balance,
86 object_id,
87 }
88 }
89}
90
91const CURRENT_DB_VERSION: u64 = 0;
92const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
93
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
95struct MetadataInfo {
96 version: u64,
98 column_families: BTreeMap<String, ColumnFamilyInfo>,
103}
104
105#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106struct ColumnFamilyInfo {
107 version: u64,
108}
109
110pub const MAX_TX_RANGE_SIZE: u64 = 4096;
111
112pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
113const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
114const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
115const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
116
117#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
118pub struct TotalBalance {
119 pub balance: i128,
120 pub num_coins: i64,
121}
122
123#[derive(Debug)]
124pub struct ObjectIndexChanges {
125 pub deleted_owners: Vec<OwnerIndexKey>,
126 pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
127 pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
128 pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
129}
130
131#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
132pub struct CoinInfo {
133 pub version: SequenceNumber,
134 pub digest: ObjectDigest,
135 pub balance: u64,
136 pub previous_transaction: TransactionDigest,
137}
138
139impl CoinInfo {
140 pub fn from_object(object: &Object) -> Option<CoinInfo> {
141 object.as_coin_maybe().map(|coin| CoinInfo {
142 version: object.version(),
143 digest: object.digest(),
144 previous_transaction: object.previous_transaction,
145 balance: coin.value(),
146 })
147 }
148}
149
150pub struct IndexStoreMetrics {
151 balance_lookup_from_db: IntCounter,
152 balance_lookup_from_total: IntCounter,
153 all_balance_lookup_from_db: IntCounter,
154 all_balance_lookup_from_total: IntCounter,
155}
156
157impl IndexStoreMetrics {
158 pub fn new(registry: &Registry) -> IndexStoreMetrics {
159 Self {
160 balance_lookup_from_db: register_int_counter_with_registry!(
161 "balance_lookup_from_db",
162 "Total number of balance requests served from cache",
163 registry,
164 )
165 .unwrap(),
166 balance_lookup_from_total: register_int_counter_with_registry!(
167 "balance_lookup_from_total",
168 "Total number of balance requests served ",
169 registry,
170 )
171 .unwrap(),
172 all_balance_lookup_from_db: register_int_counter_with_registry!(
173 "all_balance_lookup_from_db",
174 "Total number of all balance requests served from cache",
175 registry,
176 )
177 .unwrap(),
178 all_balance_lookup_from_total: register_int_counter_with_registry!(
179 "all_balance_lookup_from_total",
180 "Total number of all balance requests served",
181 registry,
182 )
183 .unwrap(),
184 }
185 }
186}
187
188pub struct IndexStoreCaches {
189 per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
190 all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
191 pub locks: MutexTable<SuiAddress>,
192}
193
194#[derive(Default)]
195pub struct IndexStoreCacheUpdates {
196 _locks: Vec<OwnedMutexGuard<()>>,
197 per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
198 all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
199}
200
201#[derive(DBMapUtils)]
202pub struct IndexStoreTables {
203 meta: DBMap<(), MetadataInfo>,
210
211 transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
213
214 transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
216
217 #[deprecated]
219 transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
220
221 #[deprecated]
223 transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
224
225 transactions_by_move_function:
227 DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
228
229 transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
231
232 transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
234
235 owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
240
241 coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
242
243 dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
248
249 event_order: DBMap<EventId, EventIndex>,
250 event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
251 event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
252 event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
253 event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
254 event_by_time: DBMap<(u64, EventId), EventIndex>,
255
256 pruner_watermark: DBMap<(), TxSequenceNumber>,
257}
258
259impl IndexStoreTables {
260 pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
261 &self.owner_index
262 }
263
264 pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
265 &self.coin_index_2
266 }
267
268 #[allow(deprecated)]
269 fn init(&mut self) -> Result<(), StorageError> {
270 let metadata = {
271 match self.meta.get(&()) {
272 Ok(Some(metadata)) => metadata,
273 Ok(None) | Err(_) => MetadataInfo {
274 version: CURRENT_DB_VERSION,
275 column_families: BTreeMap::new(),
276 },
277 }
278 };
279
280 self.meta.insert(&(), &metadata)?;
282
283 Ok(())
284 }
285}
286
287pub struct IndexStore {
288 next_sequence_number: AtomicU64,
289 tables: IndexStoreTables,
290 pub caches: IndexStoreCaches,
291 metrics: Arc<IndexStoreMetrics>,
292 max_type_length: u64,
293 remove_deprecated_tables: bool,
294 pruner_watermark: Arc<AtomicU64>,
295}
296
297struct JsonRpcCompactionMetrics {
298 key_removed: IntCounterVec,
299 key_kept: IntCounterVec,
300 key_error: IntCounterVec,
301}
302
303impl JsonRpcCompactionMetrics {
304 pub fn new(registry: &Registry) -> Arc<Self> {
305 Arc::new(Self {
306 key_removed: register_int_counter_vec_with_registry!(
307 "json_rpc_compaction_filter_key_removed",
308 "Compaction key removed",
309 &["cf"],
310 registry
311 )
312 .unwrap(),
313 key_kept: register_int_counter_vec_with_registry!(
314 "json_rpc_compaction_filter_key_kept",
315 "Compaction key kept",
316 &["cf"],
317 registry
318 )
319 .unwrap(),
320 key_error: register_int_counter_vec_with_registry!(
321 "json_rpc_compaction_filter_key_error",
322 "Compaction error",
323 &["cf"],
324 registry
325 )
326 .unwrap(),
327 })
328 }
329}
330
331fn compaction_filter_config<T: DeserializeOwned>(
332 name: &str,
333 metrics: Arc<JsonRpcCompactionMetrics>,
334 mut db_options: DBOptions,
335 pruner_watermark: Arc<AtomicU64>,
336 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
337 by_key: bool,
338) -> DBOptions {
339 let cf = name.to_string();
340 db_options
341 .options
342 .set_compaction_filter(name, move |_, key, value| {
343 let bytes = if by_key { key } else { value };
344 let deserializer = bincode::DefaultOptions::new()
345 .with_big_endian()
346 .with_fixint_encoding();
347 match deserializer.deserialize(bytes) {
348 Ok(key_data) => {
349 let sequence_number = extractor(key_data);
350 if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
351 metrics.key_removed.with_label_values(&[&cf]).inc();
352 Decision::Remove
353 } else {
354 metrics.key_kept.with_label_values(&[&cf]).inc();
355 Decision::Keep
356 }
357 }
358 Err(_) => {
359 metrics.key_error.with_label_values(&[&cf]).inc();
360 Decision::Keep
361 }
362 }
363 });
364 db_options
365}
366
367fn compaction_filter_config_by_key<T: DeserializeOwned>(
368 name: &str,
369 metrics: Arc<JsonRpcCompactionMetrics>,
370 db_options: DBOptions,
371 pruner_watermark: Arc<AtomicU64>,
372 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
373) -> DBOptions {
374 compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
375}
376
377fn coin_index_table_default_config() -> DBOptions {
378 default_db_options()
379 .optimize_for_write_throughput()
380 .optimize_for_read(
381 read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
382 )
383 .disable_write_throttling()
384}
385
386impl IndexStore {
387 pub fn new_without_init(
388 path: PathBuf,
389 registry: &Registry,
390 max_type_length: Option<u64>,
391 remove_deprecated_tables: bool,
392 ) -> Self {
393 let db_options = default_db_options().disable_write_throttling();
394 let pruner_watermark = Arc::new(AtomicU64::new(0));
395 let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
396 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
397 (
398 "transactions_from_addr".to_string(),
399 compaction_filter_config_by_key(
400 "transactions_from_addr",
401 compaction_metrics.clone(),
402 db_options.clone(),
403 pruner_watermark.clone(),
404 |(_, id): (SuiAddress, TxSequenceNumber)| id,
405 ),
406 ),
407 (
408 "transactions_to_addr".to_string(),
409 compaction_filter_config_by_key(
410 "transactions_to_addr",
411 compaction_metrics.clone(),
412 db_options.clone(),
413 pruner_watermark.clone(),
414 |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
415 ),
416 ),
417 (
418 "transactions_by_move_function".to_string(),
419 compaction_filter_config_by_key(
420 "transactions_by_move_function",
421 compaction_metrics.clone(),
422 db_options.clone(),
423 pruner_watermark.clone(),
424 |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
425 ),
426 ),
427 (
428 "transaction_order".to_string(),
429 compaction_filter_config_by_key(
430 "transaction_order",
431 compaction_metrics.clone(),
432 db_options.clone(),
433 pruner_watermark.clone(),
434 |sequence_number: TxSequenceNumber| sequence_number,
435 ),
436 ),
437 (
438 "transactions_seq".to_string(),
439 compaction_filter_config(
440 "transactions_seq",
441 compaction_metrics.clone(),
442 db_options.clone(),
443 pruner_watermark.clone(),
444 |sequence_number: TxSequenceNumber| sequence_number,
445 false,
446 ),
447 ),
448 (
449 "coin_index_2".to_string(),
450 coin_index_table_default_config(),
451 ),
452 (
453 "event_order".to_string(),
454 compaction_filter_config_by_key(
455 "event_order",
456 compaction_metrics.clone(),
457 db_options.clone(),
458 pruner_watermark.clone(),
459 |event_id: EventId| event_id.0,
460 ),
461 ),
462 (
463 "event_by_move_module".to_string(),
464 compaction_filter_config_by_key(
465 "event_by_move_module",
466 compaction_metrics.clone(),
467 db_options.clone(),
468 pruner_watermark.clone(),
469 |(_, event_id): (ModuleId, EventId)| event_id.0,
470 ),
471 ),
472 (
473 "event_by_event_module".to_string(),
474 compaction_filter_config_by_key(
475 "event_by_event_module",
476 compaction_metrics.clone(),
477 db_options.clone(),
478 pruner_watermark.clone(),
479 |(_, event_id): (ModuleId, EventId)| event_id.0,
480 ),
481 ),
482 (
483 "event_by_sender".to_string(),
484 compaction_filter_config_by_key(
485 "event_by_sender",
486 compaction_metrics.clone(),
487 db_options.clone(),
488 pruner_watermark.clone(),
489 |(_, event_id): (SuiAddress, EventId)| event_id.0,
490 ),
491 ),
492 (
493 "event_by_time".to_string(),
494 compaction_filter_config_by_key(
495 "event_by_time",
496 compaction_metrics.clone(),
497 db_options.clone(),
498 pruner_watermark.clone(),
499 |(_, event_id): (u64, EventId)| event_id.0,
500 ),
501 ),
502 ]));
503 let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
504 path,
505 MetricConf::new("index"),
506 Some(db_options.options),
507 Some(table_options),
508 remove_deprecated_tables,
509 );
510
511 let metrics = IndexStoreMetrics::new(registry);
512 let caches = IndexStoreCaches {
513 per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
514 all_balances: ShardedLruCache::new(1_000_000, 1000),
515 locks: MutexTable::new(128),
516 };
517 let next_sequence_number = tables
518 .transaction_order
519 .reversed_safe_iter_with_bounds(None, None)
520 .expect("failed to initialize indexes")
521 .next()
522 .transpose()
523 .expect("failed to initialize indexes")
524 .map(|(seq, _)| seq + 1)
525 .unwrap_or(0)
526 .into();
527 let pruner_watermark_value = tables
528 .pruner_watermark
529 .get(&())
530 .expect("failed to initialize index tables")
531 .unwrap_or(0);
532 pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
533
534 Self {
535 tables,
536 next_sequence_number,
537 caches,
538 metrics: Arc::new(metrics),
539 max_type_length: max_type_length.unwrap_or(128),
540 remove_deprecated_tables,
541 pruner_watermark,
542 }
543 }
544
545 pub fn new(
546 path: PathBuf,
547 registry: &Registry,
548 max_type_length: Option<u64>,
549 remove_deprecated_tables: bool,
550 ) -> Self {
551 let mut store =
552 Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
553 store.tables.init().unwrap();
554 store
555 }
556
557 pub fn tables(&self) -> &IndexStoreTables {
558 &self.tables
559 }
560
561 #[instrument(skip_all)]
562 pub fn index_coin(
563 &self,
564 digest: &TransactionDigest,
565 batch: &mut DBBatch,
566 object_index_changes: &ObjectIndexChanges,
567 tx_coins: Option<TxCoins>,
568 ) -> SuiResult<IndexStoreCacheUpdates> {
569 if tx_coins.is_none() {
573 return Ok(IndexStoreCacheUpdates::default());
574 }
575 let mut addresses: HashSet<SuiAddress> = HashSet::new();
577 addresses.extend(
578 object_index_changes
579 .deleted_owners
580 .iter()
581 .map(|(owner, _)| *owner),
582 );
583 addresses.extend(
584 object_index_changes
585 .new_owners
586 .iter()
587 .map(|((owner, _), _)| *owner),
588 );
589 let _locks = self.caches.locks.acquire_locks(addresses.into_iter());
590 let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
591 HashMap::new();
592 let (input_coins, written_coins) = tx_coins.unwrap();
594
595 let coin_delete_keys = input_coins
597 .values()
598 .filter_map(|object| {
599 let Owner::AddressOwner(owner) = object.owner() else {
601 return None;
602 };
603
604 let (coin_type, coin) = object
606 .coin_type_maybe()
607 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
608
609 let key = CoinIndexKey2::new(
610 *owner,
611 coin_type.to_string(),
612 coin.balance.value(),
613 object.id(),
614 );
615
616 let map = balance_changes.entry(*owner).or_default();
617 let entry = map.entry(coin_type).or_insert(TotalBalance {
618 num_coins: 0,
619 balance: 0,
620 });
621 entry.num_coins -= 1;
622 entry.balance -= coin.balance.value() as i128;
623
624 Some(key)
625 })
626 .collect::<Vec<_>>();
627 trace!(
628 tx_digset=?digest,
629 "coin_delete_keys: {:?}",
630 coin_delete_keys,
631 );
632 batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
633
634 let coin_add_keys = written_coins
636 .values()
637 .filter_map(|object| {
638 let Owner::AddressOwner(owner) = object.owner() else {
640 return None;
641 };
642
643 let (coin_type, coin) = object
645 .coin_type_maybe()
646 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
647
648 let key = CoinIndexKey2::new(
649 *owner,
650 coin_type.to_string(),
651 coin.balance.value(),
652 object.id(),
653 );
654 let value = CoinInfo {
655 version: object.version(),
656 digest: object.digest(),
657 balance: coin.balance.value(),
658 previous_transaction: object.previous_transaction,
659 };
660 let map = balance_changes.entry(*owner).or_default();
661 let entry = map.entry(coin_type).or_insert(TotalBalance {
662 num_coins: 0,
663 balance: 0,
664 });
665 entry.num_coins += 1;
666 entry.balance += coin.balance.value() as i128;
667
668 Some((key, value))
669 })
670 .collect::<Vec<_>>();
671 trace!(
672 tx_digset=?digest,
673 "coin_add_keys: {:?}",
674 coin_add_keys,
675 );
676
677 batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
678
679 let per_coin_type_balance_changes: Vec<_> = balance_changes
680 .iter()
681 .flat_map(|(address, balance_map)| {
682 balance_map.iter().map(|(type_tag, balance)| {
683 (
684 (*address, type_tag.clone()),
685 Ok::<TotalBalance, SuiError>(*balance),
686 )
687 })
688 })
689 .collect();
690 let all_balance_changes: Vec<_> = balance_changes
691 .into_iter()
692 .map(|(address, balance_map)| {
693 (
694 address,
695 Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
696 )
697 })
698 .collect();
699 let cache_updates = IndexStoreCacheUpdates {
700 _locks,
701 per_coin_type_balance_changes,
702 all_balance_changes,
703 };
704 Ok(cache_updates)
705 }
706
707 #[instrument(skip_all)]
708 pub fn index_tx(
709 &self,
710 sender: SuiAddress,
711 active_inputs: impl Iterator<Item = ObjectID>,
712 mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
713 move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
714 events: &TransactionEvents,
715 object_index_changes: ObjectIndexChanges,
716 digest: &TransactionDigest,
717 timestamp_ms: u64,
718 tx_coins: Option<TxCoins>,
719 ) -> SuiResult<u64> {
720 let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
721 let mut batch = self.tables.transactions_from_addr.batch();
722
723 batch.insert_batch(
724 &self.tables.transaction_order,
725 std::iter::once((sequence, *digest)),
726 )?;
727
728 batch.insert_batch(
729 &self.tables.transactions_seq,
730 std::iter::once((*digest, sequence)),
731 )?;
732
733 batch.insert_batch(
734 &self.tables.transactions_from_addr,
735 std::iter::once(((sender, sequence), *digest)),
736 )?;
737
738 #[allow(deprecated)]
739 if !self.remove_deprecated_tables {
740 batch.insert_batch(
741 &self.tables.transactions_by_input_object_id,
742 active_inputs.map(|id| ((id, sequence), *digest)),
743 )?;
744
745 batch.insert_batch(
746 &self.tables.transactions_by_mutated_object_id,
747 mutated_objects
748 .clone()
749 .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
750 )?;
751 }
752
753 batch.insert_batch(
754 &self.tables.transactions_by_move_function,
755 move_functions
756 .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
757 )?;
758
759 batch.insert_batch(
760 &self.tables.transactions_to_addr,
761 mutated_objects.filter_map(|(_, owner)| {
762 owner
763 .get_address_owner_address()
764 .ok()
765 .map(|addr| ((addr, sequence), digest))
766 }),
767 )?;
768
769 let cache_updates = self.index_coin(digest, &mut batch, &object_index_changes, tx_coins)?;
771
772 batch.delete_batch(
774 &self.tables.owner_index,
775 object_index_changes.deleted_owners.into_iter(),
776 )?;
777 batch.delete_batch(
778 &self.tables.dynamic_field_index,
779 object_index_changes.deleted_dynamic_fields.into_iter(),
780 )?;
781
782 batch.insert_batch(
783 &self.tables.owner_index,
784 object_index_changes.new_owners.into_iter(),
785 )?;
786
787 batch.insert_batch(
788 &self.tables.dynamic_field_index,
789 object_index_changes.new_dynamic_fields.into_iter(),
790 )?;
791
792 let event_digest = events.digest();
794 batch.insert_batch(
795 &self.tables.event_order,
796 events
797 .data
798 .iter()
799 .enumerate()
800 .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
801 )?;
802 batch.insert_batch(
803 &self.tables.event_by_move_module,
804 events
805 .data
806 .iter()
807 .enumerate()
808 .map(|(i, e)| {
809 (
810 i,
811 ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
812 )
813 })
814 .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
815 )?;
816 batch.insert_batch(
817 &self.tables.event_by_sender,
818 events.data.iter().enumerate().map(|(i, e)| {
819 (
820 (e.sender, (sequence, i)),
821 (event_digest, *digest, timestamp_ms),
822 )
823 }),
824 )?;
825 batch.insert_batch(
826 &self.tables.event_by_move_event,
827 events.data.iter().enumerate().map(|(i, e)| {
828 (
829 (e.type_.clone(), (sequence, i)),
830 (event_digest, *digest, timestamp_ms),
831 )
832 }),
833 )?;
834
835 batch.insert_batch(
836 &self.tables.event_by_time,
837 events.data.iter().enumerate().map(|(i, _)| {
838 (
839 (timestamp_ms, (sequence, i)),
840 (event_digest, *digest, timestamp_ms),
841 )
842 }),
843 )?;
844
845 batch.insert_batch(
846 &self.tables.event_by_event_module,
847 events.data.iter().enumerate().map(|(i, e)| {
848 (
849 (
850 ModuleId::new(e.type_.address, e.type_.module.clone()),
851 (sequence, i),
852 ),
853 (event_digest, *digest, timestamp_ms),
854 )
855 }),
856 )?;
857
858 let invalidate_caches =
859 read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
860
861 if invalidate_caches {
862 self.invalidate_per_coin_type_cache(
864 cache_updates
865 .per_coin_type_balance_changes
866 .iter()
867 .map(|x| x.0.clone()),
868 )?;
869 self.invalidate_all_balance_cache(
870 cache_updates.all_balance_changes.iter().map(|x| x.0),
871 )?;
872 }
873
874 batch.write()?;
875
876 if !invalidate_caches {
877 self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)?;
881 self.update_all_balance_cache(cache_updates.all_balance_changes)?;
882 }
883 Ok(sequence)
884 }
885
886 pub fn next_sequence_number(&self) -> TxSequenceNumber {
887 self.next_sequence_number.load(Ordering::SeqCst) + 1
888 }
889
890 #[instrument(skip(self))]
891 pub fn get_transactions(
892 &self,
893 filter: Option<TransactionFilter>,
894 cursor: Option<TransactionDigest>,
895 limit: Option<usize>,
896 reverse: bool,
897 ) -> SuiResult<Vec<TransactionDigest>> {
898 let cursor = if let Some(cursor) = cursor {
900 Some(
901 self.get_transaction_seq(&cursor)?
902 .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
903 )
904 } else {
905 None
906 };
907 match filter {
908 Some(TransactionFilter::MoveFunction {
909 package,
910 module,
911 function,
912 }) => Ok(self.get_transactions_by_move_function(
913 package, module, function, cursor, limit, reverse,
914 )?),
915 Some(TransactionFilter::InputObject(object_id)) => {
916 Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
917 }
918 Some(TransactionFilter::ChangedObject(object_id)) => {
919 Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
920 }
921 Some(TransactionFilter::FromAddress(address)) => {
922 Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
923 }
924 Some(TransactionFilter::ToAddress(address)) => {
925 Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
926 }
927 Some(_) => Err(SuiErrorKind::UserInputError {
930 error: UserInputError::Unsupported(format!("{:?}", filter)),
931 }
932 .into()),
933 None => {
934 if reverse {
935 let iter = self
936 .tables
937 .transaction_order
938 .reversed_safe_iter_with_bounds(
939 None,
940 Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
941 )?
942 .skip(usize::from(cursor.is_some()))
943 .map(|result| result.map(|(_, digest)| digest));
944 if let Some(limit) = limit {
945 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
946 } else {
947 Ok(iter.collect::<Result<Vec<_>, _>>()?)
948 }
949 } else {
950 let iter = self
951 .tables
952 .transaction_order
953 .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
954 .skip(usize::from(cursor.is_some()))
955 .map(|result| result.map(|(_, digest)| digest));
956 if let Some(limit) = limit {
957 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
958 } else {
959 Ok(iter.collect::<Result<Vec<_>, _>>()?)
960 }
961 }
962 }
963 }
964 }
965
966 #[instrument(skip_all)]
967 fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
968 index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
969 key: KeyT,
970 cursor: Option<TxSequenceNumber>,
971 limit: Option<usize>,
972 reverse: bool,
973 ) -> SuiResult<Vec<TransactionDigest>> {
974 Ok(if reverse {
975 let iter = index
976 .reversed_safe_iter_with_bounds(
977 None,
978 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
979 )?
980 .skip(usize::from(cursor.is_some()))
982 .take_while(|result| {
983 result
984 .as_ref()
985 .map(|((id, _), _)| *id == key)
986 .unwrap_or(false)
987 })
988 .map(|result| result.map(|(_, digest)| digest));
989 if let Some(limit) = limit {
990 iter.take(limit).collect::<Result<Vec<_>, _>>()?
991 } else {
992 iter.collect::<Result<Vec<_>, _>>()?
993 }
994 } else {
995 let iter = index
996 .safe_iter_with_bounds(
997 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
998 None,
999 )
1000 .skip(usize::from(cursor.is_some()))
1002 .map(|result| result.expect("iterator db error"))
1003 .take_while(|((id, _), _)| *id == key)
1004 .map(|(_, digest)| digest);
1005 if let Some(limit) = limit {
1006 iter.take(limit).collect()
1007 } else {
1008 iter.collect()
1009 }
1010 })
1011 }
1012
1013 #[instrument(skip(self))]
1014 pub fn get_transactions_by_input_object(
1015 &self,
1016 input_object: ObjectID,
1017 cursor: Option<TxSequenceNumber>,
1018 limit: Option<usize>,
1019 reverse: bool,
1020 ) -> SuiResult<Vec<TransactionDigest>> {
1021 if self.remove_deprecated_tables {
1022 return Ok(vec![]);
1023 }
1024 #[allow(deprecated)]
1025 Self::get_transactions_from_index(
1026 &self.tables.transactions_by_input_object_id,
1027 input_object,
1028 cursor,
1029 limit,
1030 reverse,
1031 )
1032 }
1033
1034 #[instrument(skip(self))]
1035 pub fn get_transactions_by_mutated_object(
1036 &self,
1037 mutated_object: ObjectID,
1038 cursor: Option<TxSequenceNumber>,
1039 limit: Option<usize>,
1040 reverse: bool,
1041 ) -> SuiResult<Vec<TransactionDigest>> {
1042 if self.remove_deprecated_tables {
1043 return Ok(vec![]);
1044 }
1045 #[allow(deprecated)]
1046 Self::get_transactions_from_index(
1047 &self.tables.transactions_by_mutated_object_id,
1048 mutated_object,
1049 cursor,
1050 limit,
1051 reverse,
1052 )
1053 }
1054
1055 #[instrument(skip(self))]
1056 pub fn get_transactions_from_addr(
1057 &self,
1058 addr: SuiAddress,
1059 cursor: Option<TxSequenceNumber>,
1060 limit: Option<usize>,
1061 reverse: bool,
1062 ) -> SuiResult<Vec<TransactionDigest>> {
1063 Self::get_transactions_from_index(
1064 &self.tables.transactions_from_addr,
1065 addr,
1066 cursor,
1067 limit,
1068 reverse,
1069 )
1070 }
1071
1072 #[instrument(skip(self))]
1073 pub fn get_transactions_by_move_function(
1074 &self,
1075 package: ObjectID,
1076 module: Option<String>,
1077 function: Option<String>,
1078 cursor: Option<TxSequenceNumber>,
1079 limit: Option<usize>,
1080 reverse: bool,
1081 ) -> SuiResult<Vec<TransactionDigest>> {
1082 if function.is_some() && module.is_none() {
1084 return Err(SuiErrorKind::UserInputError {
1085 error: UserInputError::MoveFunctionInputError(
1086 "Cannot supply function without supplying module".to_string(),
1087 ),
1088 }
1089 .into());
1090 }
1091
1092 if cursor.is_some() && (module.is_none() || function.is_none()) {
1094 return Err(SuiErrorKind::UserInputError {
1095 error: UserInputError::MoveFunctionInputError(
1096 "Cannot supply cursor without supplying module and function".to_string(),
1097 ),
1098 }
1099 .into());
1100 }
1101
1102 let cursor_val = cursor.unwrap_or(if reverse {
1103 TxSequenceNumber::MAX
1104 } else {
1105 TxSequenceNumber::MIN
1106 });
1107
1108 let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1109 let module_val = module.clone().unwrap_or(if reverse {
1110 max_string.clone()
1111 } else {
1112 "".to_string()
1113 });
1114
1115 let function_val =
1116 function
1117 .clone()
1118 .unwrap_or(if reverse { max_string } else { "".to_string() });
1119
1120 let key = (package, module_val, function_val, cursor_val);
1121 Ok(if reverse {
1122 let iter = self
1123 .tables
1124 .transactions_by_move_function
1125 .reversed_safe_iter_with_bounds(None, Some(key))?
1126 .skip(usize::from(cursor.is_some()))
1128 .take_while(|result| {
1129 result
1130 .as_ref()
1131 .map(|((id, m, f, _), _)| {
1132 *id == package
1133 && module.as_ref().map(|x| x == m).unwrap_or(true)
1134 && function.as_ref().map(|x| x == f).unwrap_or(true)
1135 })
1136 .unwrap_or(false)
1137 })
1138 .map(|result| result.map(|(_, digest)| digest));
1139 if let Some(limit) = limit {
1140 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1141 } else {
1142 iter.collect::<Result<Vec<_>, _>>()?
1143 }
1144 } else {
1145 let iter = self
1146 .tables
1147 .transactions_by_move_function
1148 .safe_iter_with_bounds(Some(key), None)
1149 .map(|result| result.expect("iterator db error"))
1150 .skip(usize::from(cursor.is_some()))
1152 .take_while(|((id, m, f, _), _)| {
1153 *id == package
1154 && module.as_ref().map(|x| x == m).unwrap_or(true)
1155 && function.as_ref().map(|x| x == f).unwrap_or(true)
1156 })
1157 .map(|(_, digest)| digest);
1158 if let Some(limit) = limit {
1159 iter.take(limit).collect()
1160 } else {
1161 iter.collect()
1162 }
1163 })
1164 }
1165
1166 #[instrument(skip(self))]
1167 pub fn get_transactions_to_addr(
1168 &self,
1169 addr: SuiAddress,
1170 cursor: Option<TxSequenceNumber>,
1171 limit: Option<usize>,
1172 reverse: bool,
1173 ) -> SuiResult<Vec<TransactionDigest>> {
1174 Self::get_transactions_from_index(
1175 &self.tables.transactions_to_addr,
1176 addr,
1177 cursor,
1178 limit,
1179 reverse,
1180 )
1181 }
1182
1183 #[instrument(skip(self))]
1184 pub fn get_transaction_seq(
1185 &self,
1186 digest: &TransactionDigest,
1187 ) -> SuiResult<Option<TxSequenceNumber>> {
1188 Ok(self.tables.transactions_seq.get(digest)?)
1189 }
1190
1191 #[instrument(skip(self))]
1192 pub fn all_events(
1193 &self,
1194 tx_seq: TxSequenceNumber,
1195 event_seq: usize,
1196 limit: usize,
1197 descending: bool,
1198 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1199 Ok(if descending {
1200 self.tables
1201 .event_order
1202 .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1203 .take(limit)
1204 .map(|result| {
1205 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1206 (digest, tx_digest, event_seq, time)
1207 })
1208 })
1209 .collect::<Result<Vec<_>, _>>()?
1210 } else {
1211 self.tables
1212 .event_order
1213 .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1214 .take(limit)
1215 .map(|result| {
1216 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1217 (digest, tx_digest, event_seq, time)
1218 })
1219 })
1220 .collect::<Result<Vec<_>, _>>()?
1221 })
1222 }
1223
1224 #[instrument(skip(self))]
1225 pub fn events_by_transaction(
1226 &self,
1227 digest: &TransactionDigest,
1228 tx_seq: TxSequenceNumber,
1229 event_seq: usize,
1230 limit: usize,
1231 descending: bool,
1232 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1233 let seq = self
1234 .get_transaction_seq(digest)?
1235 .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1236 Ok(if descending {
1237 self.tables
1238 .event_order
1239 .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1240 .take_while(|result| {
1241 result
1242 .as_ref()
1243 .map(|((tx, _), _)| tx == &seq)
1244 .unwrap_or(false)
1245 })
1246 .take(limit)
1247 .map(|result| {
1248 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1249 (digest, tx_digest, event_seq, time)
1250 })
1251 })
1252 .collect::<Result<Vec<_>, _>>()?
1253 } else {
1254 self.tables
1255 .event_order
1256 .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1257 .map(|result| result.expect("iterator db error"))
1258 .take_while(|((tx, _), _)| tx == &seq)
1259 .take(limit)
1260 .map(|((_, event_seq), (digest, tx_digest, time))| {
1261 (digest, tx_digest, event_seq, time)
1262 })
1263 .collect()
1264 })
1265 }
1266
1267 #[instrument(skip_all)]
1268 fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1269 index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1270 key: &KeyT,
1271 tx_seq: TxSequenceNumber,
1272 event_seq: usize,
1273 limit: usize,
1274 descending: bool,
1275 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1276 Ok(if descending {
1277 index
1278 .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1279 .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1280 .take(limit)
1281 .map(|result| {
1282 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1283 (digest, tx_digest, event_seq, time)
1284 })
1285 })
1286 .collect::<Result<Vec<_>, _>>()?
1287 } else {
1288 index
1289 .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1290 .map(|result| result.expect("iterator db error"))
1291 .take_while(|((m, _), _)| m == key)
1292 .take(limit)
1293 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1294 (digest, tx_digest, event_seq, time)
1295 })
1296 .collect()
1297 })
1298 }
1299
1300 #[instrument(skip(self))]
1301 pub fn events_by_module_id(
1302 &self,
1303 module: &ModuleId,
1304 tx_seq: TxSequenceNumber,
1305 event_seq: usize,
1306 limit: usize,
1307 descending: bool,
1308 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1309 Self::get_event_from_index(
1310 &self.tables.event_by_move_module,
1311 module,
1312 tx_seq,
1313 event_seq,
1314 limit,
1315 descending,
1316 )
1317 }
1318
1319 #[instrument(skip(self))]
1320 pub fn events_by_move_event_struct_name(
1321 &self,
1322 struct_name: &StructTag,
1323 tx_seq: TxSequenceNumber,
1324 event_seq: usize,
1325 limit: usize,
1326 descending: bool,
1327 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1328 Self::get_event_from_index(
1329 &self.tables.event_by_move_event,
1330 struct_name,
1331 tx_seq,
1332 event_seq,
1333 limit,
1334 descending,
1335 )
1336 }
1337
1338 #[instrument(skip(self))]
1339 pub fn events_by_move_event_module(
1340 &self,
1341 module_id: &ModuleId,
1342 tx_seq: TxSequenceNumber,
1343 event_seq: usize,
1344 limit: usize,
1345 descending: bool,
1346 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1347 Self::get_event_from_index(
1348 &self.tables.event_by_event_module,
1349 module_id,
1350 tx_seq,
1351 event_seq,
1352 limit,
1353 descending,
1354 )
1355 }
1356
1357 #[instrument(skip(self))]
1358 pub fn events_by_sender(
1359 &self,
1360 sender: &SuiAddress,
1361 tx_seq: TxSequenceNumber,
1362 event_seq: usize,
1363 limit: usize,
1364 descending: bool,
1365 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1366 Self::get_event_from_index(
1367 &self.tables.event_by_sender,
1368 sender,
1369 tx_seq,
1370 event_seq,
1371 limit,
1372 descending,
1373 )
1374 }
1375
1376 #[instrument(skip(self))]
1377 pub fn event_iterator(
1378 &self,
1379 start_time: u64,
1380 end_time: u64,
1381 tx_seq: TxSequenceNumber,
1382 event_seq: usize,
1383 limit: usize,
1384 descending: bool,
1385 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1386 Ok(if descending {
1387 self.tables
1388 .event_by_time
1389 .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1390 .take_while(|result| {
1391 result
1392 .as_ref()
1393 .map(|((m, _), _)| m >= &start_time)
1394 .unwrap_or(false)
1395 })
1396 .take(limit)
1397 .map(|result| {
1398 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1399 (digest, tx_digest, event_seq, time)
1400 })
1401 })
1402 .collect::<Result<Vec<_>, _>>()?
1403 } else {
1404 self.tables
1405 .event_by_time
1406 .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1407 .map(|result| result.expect("iterator db error"))
1408 .take_while(|((m, _), _)| m <= &end_time)
1409 .take(limit)
1410 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1411 (digest, tx_digest, event_seq, time)
1412 })
1413 .collect()
1414 })
1415 }
1416
1417 pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1418 match self
1419 .tables
1420 .event_by_time
1421 .reversed_safe_iter_with_bounds(
1422 None,
1423 Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1424 )?
1425 .next()
1426 .transpose()?
1427 {
1428 Some(((_, (watermark, _)), _)) => {
1429 if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1430 info!(
1431 "json rpc index pruning. Watermark is {} with digest {}",
1432 watermark, digest
1433 );
1434 }
1435 self.pruner_watermark.store(watermark, Ordering::Relaxed);
1436 self.tables.pruner_watermark.insert(&(), &watermark)?;
1437 Ok(watermark)
1438 }
1439 None => Ok(0),
1440 }
1441 }
1442
1443 pub fn get_dynamic_fields_iterator(
1444 &self,
1445 object: ObjectID,
1446 cursor: Option<ObjectID>,
1447 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1448 {
1449 debug!(?object, "get_dynamic_fields");
1450 let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
1452 let iter_upper_bound = (object, ObjectID::MAX);
1453 Ok(self
1454 .tables
1455 .dynamic_field_index
1456 .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
1457 .skip(usize::from(cursor.is_some()))
1459 .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
1460 .map_ok(|((_, c), object_info)| (c, object_info)))
1461 }
1462
1463 #[instrument(skip(self))]
1464 pub fn get_dynamic_field_object_id(
1465 &self,
1466 object: ObjectID,
1467 name_type: TypeTag,
1468 name_bcs_bytes: &[u8],
1469 ) -> SuiResult<Option<ObjectID>> {
1470 debug!(?object, "get_dynamic_field_object_id");
1471 let dynamic_field_id =
1472 dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1473 |e| {
1474 SuiErrorKind::Unknown(format!(
1475 "Unable to generate dynamic field id. Got error: {e:?}"
1476 ))
1477 },
1478 )?;
1479
1480 if let Some(info) = self
1481 .tables
1482 .dynamic_field_index
1483 .get(&(object, dynamic_field_id))?
1484 {
1485 debug_assert!(
1487 info.object_id == dynamic_field_id
1488 || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1489 );
1490 return Ok(Some(info.object_id));
1491 }
1492
1493 let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1494 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1495 let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1496 object,
1497 &dynamic_object_field_type,
1498 name_bcs_bytes,
1499 )
1500 .map_err(|e| {
1501 SuiErrorKind::Unknown(format!(
1502 "Unable to generate dynamic field id. Got error: {e:?}"
1503 ))
1504 })?;
1505 if let Some(info) = self
1506 .tables
1507 .dynamic_field_index
1508 .get(&(object, dynamic_object_field_id))?
1509 {
1510 return Ok(Some(info.object_id));
1511 }
1512
1513 Ok(None)
1514 }
1515
1516 #[instrument(skip(self))]
1517 pub fn get_owner_objects(
1518 &self,
1519 owner: SuiAddress,
1520 cursor: Option<ObjectID>,
1521 limit: usize,
1522 filter: Option<SuiObjectDataFilter>,
1523 ) -> SuiResult<Vec<ObjectInfo>> {
1524 let cursor = match cursor {
1525 Some(cursor) => cursor,
1526 None => ObjectID::ZERO,
1527 };
1528 Ok(self
1529 .get_owner_objects_iterator(owner, cursor, filter)?
1530 .take(limit)
1531 .collect())
1532 }
1533
1534 pub fn get_owned_coins_iterator(
1535 coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1536 owner: SuiAddress,
1537 coin_type_tag: Option<String>,
1538 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1539 let all_coins = coin_type_tag.is_none();
1540 let starting_coin_type =
1541 coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1542 let start_key =
1543 CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1544 Ok(coin_index
1545 .safe_iter_with_bounds(Some(start_key), None)
1546 .map(|result| result.expect("iterator db error"))
1547 .take_while(move |(key, _)| {
1548 if key.owner != owner {
1549 return false;
1550 }
1551 if !all_coins && starting_coin_type != key.coin_type {
1552 return false;
1553 }
1554 true
1555 }))
1556 }
1557
1558 pub fn get_owned_coins_iterator_with_cursor(
1559 &self,
1560 owner: SuiAddress,
1561 cursor: (String, u64, ObjectID),
1562 limit: usize,
1563 one_coin_type_only: bool,
1564 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1565 let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1566 let start_key = CoinIndexKey2::new_from_cursor(
1567 owner,
1568 starting_coin_type.clone(),
1569 inverted_balance,
1570 starting_object_id,
1571 );
1572 Ok(self
1573 .tables
1574 .coin_index_2
1575 .safe_iter_with_bounds(Some(start_key), None)
1576 .map(|result| result.expect("iterator db error"))
1577 .filter(move |(key, _)| key.object_id != starting_object_id)
1578 .enumerate()
1579 .take_while(move |(index, (key, _))| {
1580 if *index >= limit {
1581 return false;
1582 }
1583 if key.owner != owner {
1584 return false;
1585 }
1586 if one_coin_type_only && starting_coin_type != key.coin_type {
1587 return false;
1588 }
1589 true
1590 })
1591 .map(|(_index, (key, info))| (key, info)))
1592 }
1593
1594 pub fn get_owner_objects_iterator(
1597 &self,
1598 owner: SuiAddress,
1599 starting_object_id: ObjectID,
1600 filter: Option<SuiObjectDataFilter>,
1601 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1602 Ok(self
1603 .tables
1604 .owner_index
1605 .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1607 .map(|result| result.expect("iterator db error"))
1608 .skip(usize::from(starting_object_id != ObjectID::ZERO))
1609 .take_while(move |((address_owner, _), _)| address_owner == &owner)
1610 .filter(move |(_, o)| {
1611 if let Some(filter) = filter.as_ref() {
1612 filter.matches(o)
1613 } else {
1614 true
1615 }
1616 })
1617 .map(|(_, object_info)| object_info))
1618 }
1619
1620 pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
1621 let mut batch = self.tables.owner_index.batch();
1622 batch.insert_batch(
1623 &self.tables.owner_index,
1624 object_index_changes.new_owners.into_iter(),
1625 )?;
1626 batch.insert_batch(
1627 &self.tables.dynamic_field_index,
1628 object_index_changes.new_dynamic_fields.into_iter(),
1629 )?;
1630 batch.write()?;
1631 Ok(())
1632 }
1633
1634 pub fn is_empty(&self) -> bool {
1635 self.tables.owner_index.is_empty()
1636 }
1637
1638 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1639 self.tables
1641 .transactions_from_addr
1642 .checkpoint_db(path)
1643 .map_err(Into::into)
1644 }
1645
1646 #[instrument(skip(self))]
1651 pub fn get_balance(&self, owner: SuiAddress, coin_type: TypeTag) -> SuiResult<TotalBalance> {
1652 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1653 let cloned_coin_type = coin_type.clone();
1654 let metrics_cloned = self.metrics.clone();
1655 let coin_index_cloned = self.tables.coin_index_2.clone();
1656 if force_disable_cache {
1657 Self::get_balance_from_db(metrics_cloned, coin_index_cloned, owner, cloned_coin_type)
1658 .map_err(|e| {
1659 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1660 })?;
1661 }
1662
1663 self.metrics.balance_lookup_from_total.inc();
1664
1665 let balance = self
1666 .caches
1667 .per_coin_type_balance
1668 .get(&(owner, coin_type.clone()));
1669 if let Some(balance) = balance {
1670 return balance;
1671 }
1672 let all_balance = self.caches.all_balances.get(&owner.clone());
1674 if let Some(Ok(all_balance)) = all_balance
1675 && let Some(balance) = all_balance.get(&coin_type)
1676 {
1677 return Ok(*balance);
1678 }
1679 let cloned_coin_type = coin_type.clone();
1680 let metrics_cloned = self.metrics.clone();
1681 let coin_index_cloned = self.tables.coin_index_2.clone();
1682 self.caches
1683 .per_coin_type_balance
1684 .get_with((owner, coin_type), move || {
1685 Self::get_balance_from_db(
1686 metrics_cloned,
1687 coin_index_cloned,
1688 owner,
1689 cloned_coin_type,
1690 )
1691 .map_err(|e| {
1692 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1693 .into()
1694 })
1695 })
1696 }
1697
1698 #[instrument(skip(self))]
1704 pub fn get_all_balance(
1705 &self,
1706 owner: SuiAddress,
1707 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1708 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1709 let metrics_cloned = self.metrics.clone();
1710 let coin_index_cloned = self.tables.coin_index_2.clone();
1711 if force_disable_cache {
1712 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(
1713 |e| {
1714 SuiErrorKind::ExecutionError(format!(
1715 "Failed to read all balance from DB: {:?}",
1716 e
1717 ))
1718 },
1719 )?;
1720 }
1721
1722 self.metrics.all_balance_lookup_from_total.inc();
1723 let metrics_cloned = self.metrics.clone();
1724 let coin_index_cloned = self.tables.coin_index_2.clone();
1725 self.caches.all_balances.get_with(owner, move || {
1726 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
1727 SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
1728 .into()
1729 })
1730 })
1731 }
1732
1733 #[instrument(skip_all)]
1735 pub fn get_balance_from_db(
1736 metrics: Arc<IndexStoreMetrics>,
1737 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1738 owner: SuiAddress,
1739 coin_type: TypeTag,
1740 ) -> SuiResult<TotalBalance> {
1741 metrics.balance_lookup_from_db.inc();
1742 let coin_type_str = coin_type.to_string();
1743 let coins =
1744 Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
1745
1746 let mut balance = 0i128;
1747 let mut num_coins = 0;
1748 for (_key, coin_info) in coins {
1749 balance += coin_info.balance as i128;
1750 num_coins += 1;
1751 }
1752 Ok(TotalBalance { balance, num_coins })
1753 }
1754
1755 #[instrument(skip_all)]
1757 pub fn get_all_balances_from_db(
1758 metrics: Arc<IndexStoreMetrics>,
1759 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1760 owner: SuiAddress,
1761 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1762 metrics.all_balance_lookup_from_db.inc();
1763 let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1764 let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1765 .chunk_by(|(key, _coin)| key.coin_type.clone());
1766 for (coin_type, coins) in &coins {
1767 let mut total_balance = 0i128;
1768 let mut coin_object_count = 0;
1769 for (_, coin_info) in coins {
1770 total_balance += coin_info.balance as i128;
1771 coin_object_count += 1;
1772 }
1773 let coin_type =
1774 TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
1775 SuiErrorKind::ExecutionError(format!(
1776 "Failed to parse event sender address: {:?}",
1777 e
1778 ))
1779 })?));
1780 balances.insert(
1781 coin_type,
1782 TotalBalance {
1783 num_coins: coin_object_count,
1784 balance: total_balance,
1785 },
1786 );
1787 }
1788 Ok(Arc::new(balances))
1789 }
1790
1791 fn invalidate_per_coin_type_cache(
1792 &self,
1793 keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
1794 ) -> SuiResult {
1795 self.caches.per_coin_type_balance.batch_invalidate(keys);
1796 Ok(())
1797 }
1798
1799 fn invalidate_all_balance_cache(
1800 &self,
1801 addresses: impl IntoIterator<Item = SuiAddress>,
1802 ) -> SuiResult {
1803 self.caches.all_balances.batch_invalidate(addresses);
1804 Ok(())
1805 }
1806
1807 fn update_per_coin_type_cache(
1808 &self,
1809 keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
1810 ) -> SuiResult {
1811 self.caches
1812 .per_coin_type_balance
1813 .batch_merge(keys, Self::merge_balance);
1814 Ok(())
1815 }
1816
1817 fn merge_balance(
1818 old_balance: &SuiResult<TotalBalance>,
1819 balance_delta: &SuiResult<TotalBalance>,
1820 ) -> SuiResult<TotalBalance> {
1821 if let Ok(old_balance) = old_balance {
1822 if let Ok(balance_delta) = balance_delta {
1823 Ok(TotalBalance {
1824 balance: old_balance.balance + balance_delta.balance,
1825 num_coins: old_balance.num_coins + balance_delta.num_coins,
1826 })
1827 } else {
1828 balance_delta.clone()
1829 }
1830 } else {
1831 old_balance.clone()
1832 }
1833 }
1834
1835 fn update_all_balance_cache(
1836 &self,
1837 keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1838 ) -> SuiResult {
1839 self.caches
1840 .all_balances
1841 .batch_merge(keys, Self::merge_all_balance);
1842 Ok(())
1843 }
1844
1845 fn merge_all_balance(
1846 old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1847 balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1848 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1849 if let Ok(old_balance) = old_balance {
1850 if let Ok(balance_delta) = balance_delta {
1851 let mut new_balance = HashMap::new();
1852 for (key, value) in old_balance.iter() {
1853 new_balance.insert(key.clone(), *value);
1854 }
1855 for (key, delta) in balance_delta.iter() {
1856 let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1857 balance: 0,
1858 num_coins: 0,
1859 });
1860 let new_total = TotalBalance {
1861 balance: old.balance + delta.balance,
1862 num_coins: old.num_coins + delta.num_coins,
1863 };
1864 new_balance.insert(key.clone(), new_total);
1865 }
1866 Ok(Arc::new(new_balance))
1867 } else {
1868 balance_delta.clone()
1869 }
1870 } else {
1871 old_balance.clone()
1872 }
1873 }
1874}
1875
1876#[cfg(test)]
1877mod tests {
1878 use super::IndexStore;
1879 use super::ObjectIndexChanges;
1880 use move_core_types::account_address::AccountAddress;
1881 use prometheus::Registry;
1882 use std::collections::BTreeMap;
1883 use std::env::temp_dir;
1884 use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
1885 use sui_types::digests::TransactionDigest;
1886 use sui_types::effects::TransactionEvents;
1887 use sui_types::gas_coin::GAS;
1888 use sui_types::object;
1889 use sui_types::object::Owner;
1890
1891 #[tokio::test]
1892 async fn test_index_cache() -> anyhow::Result<()> {
1893 let index_store =
1901 IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
1902 let address: SuiAddress = AccountAddress::random().into();
1903 let mut written_objects = BTreeMap::new();
1904 let mut input_objects = BTreeMap::new();
1905 let mut object_map = BTreeMap::new();
1906
1907 let mut new_objects = vec![];
1908 for _i in 0..10 {
1909 let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1910 new_objects.push((
1911 (address, object.id()),
1912 ObjectInfo {
1913 object_id: object.id(),
1914 version: object.version(),
1915 digest: object.digest(),
1916 type_: ObjectType::Struct(object.type_().unwrap().clone()),
1917 owner: Owner::AddressOwner(address),
1918 previous_transaction: object.previous_transaction,
1919 },
1920 ));
1921 object_map.insert(object.id(), object.clone());
1922 written_objects.insert(object.data.id(), object);
1923 }
1924 let object_index_changes = ObjectIndexChanges {
1925 deleted_owners: vec![],
1926 deleted_dynamic_fields: vec![],
1927 new_owners: new_objects,
1928 new_dynamic_fields: vec![],
1929 };
1930
1931 let tx_coins = (input_objects.clone(), written_objects.clone());
1932 index_store.index_tx(
1933 address,
1934 vec![].into_iter(),
1935 vec![].into_iter(),
1936 vec![].into_iter(),
1937 &TransactionEvents { data: vec![] },
1938 object_index_changes,
1939 &TransactionDigest::random(),
1940 1234,
1941 Some(tx_coins),
1942 )?;
1943
1944 let balance_from_db = IndexStore::get_balance_from_db(
1945 index_store.metrics.clone(),
1946 index_store.tables.coin_index_2.clone(),
1947 address,
1948 GAS::type_tag(),
1949 )?;
1950 let balance = index_store.get_balance(address, GAS::type_tag())?;
1951 assert_eq!(balance, balance_from_db);
1952 assert_eq!(balance.balance, 1000);
1953 assert_eq!(balance.num_coins, 10);
1954
1955 let all_balance = index_store.get_all_balance(address)?;
1956 let balance = all_balance.get(&GAS::type_tag()).unwrap();
1957 assert_eq!(*balance, balance_from_db);
1958 assert_eq!(balance.balance, 1000);
1959 assert_eq!(balance.num_coins, 10);
1960
1961 written_objects.clear();
1962 let mut deleted_objects = vec![];
1963 for (id, object) in object_map.iter().take(3) {
1964 deleted_objects.push((address, *id));
1965 input_objects.insert(*id, object.to_owned());
1966 }
1967 let object_index_changes = ObjectIndexChanges {
1968 deleted_owners: deleted_objects.clone(),
1969 deleted_dynamic_fields: vec![],
1970 new_owners: vec![],
1971 new_dynamic_fields: vec![],
1972 };
1973 let tx_coins = (input_objects, written_objects);
1974 index_store.index_tx(
1975 address,
1976 vec![].into_iter(),
1977 vec![].into_iter(),
1978 vec![].into_iter(),
1979 &TransactionEvents { data: vec![] },
1980 object_index_changes,
1981 &TransactionDigest::random(),
1982 1234,
1983 Some(tx_coins),
1984 )?;
1985 let balance_from_db = IndexStore::get_balance_from_db(
1986 index_store.metrics.clone(),
1987 index_store.tables.coin_index_2.clone(),
1988 address,
1989 GAS::type_tag(),
1990 )?;
1991 let balance = index_store.get_balance(address, GAS::type_tag())?;
1992 assert_eq!(balance, balance_from_db);
1993 assert_eq!(balance.balance, 700);
1994 assert_eq!(balance.num_coins, 7);
1995 index_store
1998 .caches
1999 .per_coin_type_balance
2000 .invalidate(&(address, GAS::type_tag()));
2001 let all_balance = index_store.get_all_balance(address)?;
2002 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2003 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2004 let balance = index_store.get_balance(address, GAS::type_tag())?;
2005 assert_eq!(balance, balance_from_db);
2006 assert_eq!(balance.balance, 700);
2007 assert_eq!(balance.num_coins, 7);
2008
2009 Ok(())
2010 }
2011
2012 #[tokio::test]
2013 async fn test_get_transaction_by_move_function() {
2014 use sui_types::base_types::ObjectID;
2015 use typed_store::Map;
2016
2017 let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2018 let db = &index_store.tables.transactions_by_move_function;
2019 db.insert(
2020 &(
2021 ObjectID::new([1; 32]),
2022 "mod".to_string(),
2023 "f".to_string(),
2024 0,
2025 ),
2026 &[0; 32].into(),
2027 )
2028 .unwrap();
2029 db.insert(
2030 &(
2031 ObjectID::new([1; 32]),
2032 "mod".to_string(),
2033 "Z".repeat(128),
2034 0,
2035 ),
2036 &[1; 32].into(),
2037 )
2038 .unwrap();
2039 db.insert(
2040 &(
2041 ObjectID::new([1; 32]),
2042 "mod".to_string(),
2043 "f".repeat(128),
2044 0,
2045 ),
2046 &[2; 32].into(),
2047 )
2048 .unwrap();
2049 db.insert(
2050 &(
2051 ObjectID::new([1; 32]),
2052 "mod".to_string(),
2053 "z".repeat(128),
2054 0,
2055 ),
2056 &[3; 32].into(),
2057 )
2058 .unwrap();
2059
2060 let mut v = index_store
2061 .get_transactions_by_move_function(
2062 ObjectID::new([1; 32]),
2063 Some("mod".to_string()),
2064 None,
2065 None,
2066 None,
2067 false,
2068 )
2069 .unwrap();
2070 let v_rev = index_store
2071 .get_transactions_by_move_function(
2072 ObjectID::new([1; 32]),
2073 Some("mod".to_string()),
2074 None,
2075 None,
2076 None,
2077 true,
2078 )
2079 .unwrap();
2080 v.reverse();
2081 assert_eq!(v, v_rev);
2082 }
2083}