1use std::cmp::{max, min};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13use bincode::Options;
14use itertools::Itertools;
15use move_core_types::language_storage::{ModuleId, StructTag, TypeTag};
16use parking_lot::ArcMutexGuard;
17use prometheus::{
18 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
19 register_int_counter_with_registry,
20};
21use serde::{Deserialize, Serialize, de::DeserializeOwned};
22use sui_types::accumulator_event::AccumulatorEvent;
23use typed_store::TypedStoreError;
24use typed_store::rocksdb::compaction_filter::Decision;
25
26use sui_json_rpc_types::{SuiObjectDataFilter, TransactionFilter};
27use sui_storage::mutex_table::MutexTable;
28use sui_storage::sharded_lru::ShardedLruCache;
29use sui_types::base_types::{
30 ObjectDigest, ObjectID, SequenceNumber, SuiAddress, TransactionDigest, TxSequenceNumber,
31};
32use sui_types::base_types::{ObjectInfo, ObjectRef};
33use sui_types::digests::TransactionEventsDigest;
34use sui_types::dynamic_field::{self, DynamicFieldInfo};
35use sui_types::effects::TransactionEvents;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
37use sui_types::inner_temporary_store::TxCoins;
38use sui_types::object::{Object, Owner};
39use sui_types::parse_sui_struct_tag;
40use sui_types::storage::error::Error as StorageError;
41use tracing::{debug, info, instrument, trace};
42use typed_store::DBMapUtils;
43use typed_store::rocks::{
44 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
45 read_size_from_env,
46};
47use typed_store::traits::Map;
48
49type OwnedMutexGuard<T> = ArcMutexGuard<parking_lot::RawMutex, T>;
50
51type OwnerIndexKey = (SuiAddress, ObjectID);
52type DynamicFieldKey = (ObjectID, ObjectID);
53type EventId = (TxSequenceNumber, usize);
54type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
55type AllBalance = HashMap<TypeTag, TotalBalance>;
56
57#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
58pub struct CoinIndexKey2 {
59 pub owner: SuiAddress,
60 pub coin_type: String,
61 pub inverted_balance: u64,
64 pub object_id: ObjectID,
65}
66
67impl CoinIndexKey2 {
68 pub fn new_from_cursor(
69 owner: SuiAddress,
70 coin_type: String,
71 inverted_balance: u64,
72 object_id: ObjectID,
73 ) -> Self {
74 Self {
75 owner,
76 coin_type,
77 inverted_balance,
78 object_id,
79 }
80 }
81
82 pub fn new(owner: SuiAddress, coin_type: String, balance: u64, object_id: ObjectID) -> Self {
83 Self {
84 owner,
85 coin_type,
86 inverted_balance: !balance,
87 object_id,
88 }
89 }
90}
91
92const CURRENT_DB_VERSION: u64 = 0;
93const _CURRENT_COIN_INDEX_VERSION: u64 = 1;
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
96struct MetadataInfo {
97 version: u64,
99 column_families: BTreeMap<String, ColumnFamilyInfo>,
104}
105
106#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
107struct ColumnFamilyInfo {
108 version: u64,
109}
110
111pub const MAX_TX_RANGE_SIZE: u64 = 4096;
112
113pub const MAX_GET_OWNED_OBJECT_SIZE: usize = 256;
114const ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB: &str = "COIN_INDEX_BLOCK_CACHE_MB";
115const ENV_VAR_DISABLE_INDEX_CACHE: &str = "DISABLE_INDEX_CACHE";
116const ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE: &str = "INVALIDATE_INSTEAD_OF_UPDATE";
117
118#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
119pub struct TotalBalance {
120 pub balance: i128,
121 pub num_coins: i64,
122 pub address_balance: u64,
123}
124
125#[derive(Debug)]
126pub struct ObjectIndexChanges {
127 pub deleted_owners: Vec<OwnerIndexKey>,
128 pub deleted_dynamic_fields: Vec<DynamicFieldKey>,
129 pub new_owners: Vec<(OwnerIndexKey, ObjectInfo)>,
130 pub new_dynamic_fields: Vec<(DynamicFieldKey, DynamicFieldInfo)>,
131}
132
133#[derive(Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Debug)]
134pub struct CoinInfo {
135 pub version: SequenceNumber,
136 pub digest: ObjectDigest,
137 pub balance: u64,
138 pub previous_transaction: TransactionDigest,
139}
140
141impl CoinInfo {
142 pub fn from_object(object: &Object) -> Option<CoinInfo> {
143 object.as_coin_maybe().map(|coin| CoinInfo {
144 version: object.version(),
145 digest: object.digest(),
146 previous_transaction: object.previous_transaction,
147 balance: coin.value(),
148 })
149 }
150}
151
152pub struct IndexStoreMetrics {
153 balance_lookup_from_db: IntCounter,
154 balance_lookup_from_total: IntCounter,
155 all_balance_lookup_from_db: IntCounter,
156 all_balance_lookup_from_total: IntCounter,
157}
158
159impl IndexStoreMetrics {
160 pub fn new(registry: &Registry) -> IndexStoreMetrics {
161 Self {
162 balance_lookup_from_db: register_int_counter_with_registry!(
163 "balance_lookup_from_db",
164 "Total number of balance requests served from cache",
165 registry,
166 )
167 .unwrap(),
168 balance_lookup_from_total: register_int_counter_with_registry!(
169 "balance_lookup_from_total",
170 "Total number of balance requests served ",
171 registry,
172 )
173 .unwrap(),
174 all_balance_lookup_from_db: register_int_counter_with_registry!(
175 "all_balance_lookup_from_db",
176 "Total number of all balance requests served from cache",
177 registry,
178 )
179 .unwrap(),
180 all_balance_lookup_from_total: register_int_counter_with_registry!(
181 "all_balance_lookup_from_total",
182 "Total number of all balance requests served",
183 registry,
184 )
185 .unwrap(),
186 }
187 }
188}
189
190pub struct IndexStoreCaches {
191 per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
192 all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
193 pub locks: MutexTable<SuiAddress>,
194}
195
196#[derive(Default)]
197pub struct IndexStoreCacheUpdates {
198 _locks: Vec<OwnedMutexGuard<()>>,
199 per_coin_type_balance_changes: Vec<((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
200 all_balance_changes: Vec<(SuiAddress, SuiResult<Arc<AllBalance>>)>,
201}
202
203#[derive(DBMapUtils)]
204pub struct IndexStoreTables {
205 meta: DBMap<(), MetadataInfo>,
212
213 transactions_from_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
215
216 transactions_to_addr: DBMap<(SuiAddress, TxSequenceNumber), TransactionDigest>,
218
219 #[deprecated]
221 transactions_by_input_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
222
223 #[deprecated]
225 transactions_by_mutated_object_id: DBMap<(ObjectID, TxSequenceNumber), TransactionDigest>,
226
227 transactions_by_move_function:
229 DBMap<(ObjectID, String, String, TxSequenceNumber), TransactionDigest>,
230
231 transaction_order: DBMap<TxSequenceNumber, TransactionDigest>,
233
234 transactions_seq: DBMap<TransactionDigest, TxSequenceNumber>,
236
237 owner_index: DBMap<OwnerIndexKey, ObjectInfo>,
242
243 coin_index_2: DBMap<CoinIndexKey2, CoinInfo>,
244 address_balances: DBMap<(SuiAddress, TypeTag), ()>,
246
247 dynamic_field_index: DBMap<DynamicFieldKey, DynamicFieldInfo>,
252
253 event_order: DBMap<EventId, EventIndex>,
254 event_by_move_module: DBMap<(ModuleId, EventId), EventIndex>,
255 event_by_move_event: DBMap<(StructTag, EventId), EventIndex>,
256 event_by_event_module: DBMap<(ModuleId, EventId), EventIndex>,
257 event_by_sender: DBMap<(SuiAddress, EventId), EventIndex>,
258 event_by_time: DBMap<(u64, EventId), EventIndex>,
259
260 pruner_watermark: DBMap<(), TxSequenceNumber>,
261}
262
263impl IndexStoreTables {
264 pub fn owner_index(&self) -> &DBMap<OwnerIndexKey, ObjectInfo> {
265 &self.owner_index
266 }
267
268 pub fn coin_index(&self) -> &DBMap<CoinIndexKey2, CoinInfo> {
269 &self.coin_index_2
270 }
271
272 #[allow(deprecated)]
273 fn init(&mut self) -> Result<(), StorageError> {
274 let metadata = {
275 match self.meta.get(&()) {
276 Ok(Some(metadata)) => metadata,
277 Ok(None) | Err(_) => MetadataInfo {
278 version: CURRENT_DB_VERSION,
279 column_families: BTreeMap::new(),
280 },
281 }
282 };
283
284 self.meta.insert(&(), &metadata)?;
286
287 Ok(())
288 }
289
290 pub fn get_dynamic_fields_iterator(
291 &self,
292 object: ObjectID,
293 cursor: Option<ObjectID>,
294 ) -> impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_ {
295 debug!(?object, "get_dynamic_fields");
296 let iter_lower_bound = (object, cursor.unwrap_or(ObjectID::ZERO));
298 let iter_upper_bound = (object, ObjectID::MAX);
299 self.dynamic_field_index
300 .safe_iter_with_bounds(Some(iter_lower_bound), Some(iter_upper_bound))
301 .skip(usize::from(cursor.is_some()))
303 .take_while(move |result| result.is_err() || (result.as_ref().unwrap().0.0 == object))
304 .map_ok(|((_, c), object_info)| (c, object_info))
305 }
306}
307
308pub struct IndexStore {
309 next_sequence_number: AtomicU64,
310 tables: IndexStoreTables,
311 pub caches: IndexStoreCaches,
312 metrics: Arc<IndexStoreMetrics>,
313 max_type_length: u64,
314 remove_deprecated_tables: bool,
315 pruner_watermark: Arc<AtomicU64>,
316}
317
318struct JsonRpcCompactionMetrics {
319 key_removed: IntCounterVec,
320 key_kept: IntCounterVec,
321 key_error: IntCounterVec,
322}
323
324impl JsonRpcCompactionMetrics {
325 pub fn new(registry: &Registry) -> Arc<Self> {
326 Arc::new(Self {
327 key_removed: register_int_counter_vec_with_registry!(
328 "json_rpc_compaction_filter_key_removed",
329 "Compaction key removed",
330 &["cf"],
331 registry
332 )
333 .unwrap(),
334 key_kept: register_int_counter_vec_with_registry!(
335 "json_rpc_compaction_filter_key_kept",
336 "Compaction key kept",
337 &["cf"],
338 registry
339 )
340 .unwrap(),
341 key_error: register_int_counter_vec_with_registry!(
342 "json_rpc_compaction_filter_key_error",
343 "Compaction error",
344 &["cf"],
345 registry
346 )
347 .unwrap(),
348 })
349 }
350}
351
352fn compaction_filter_config<T: DeserializeOwned>(
353 name: &str,
354 metrics: Arc<JsonRpcCompactionMetrics>,
355 mut db_options: DBOptions,
356 pruner_watermark: Arc<AtomicU64>,
357 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
358 by_key: bool,
359) -> DBOptions {
360 let cf = name.to_string();
361 db_options
362 .options
363 .set_compaction_filter(name, move |_, key, value| {
364 let bytes = if by_key { key } else { value };
365 let deserializer = bincode::DefaultOptions::new()
366 .with_big_endian()
367 .with_fixint_encoding();
368 match deserializer.deserialize(bytes) {
369 Ok(key_data) => {
370 let sequence_number = extractor(key_data);
371 if sequence_number < pruner_watermark.load(Ordering::Relaxed) {
372 metrics.key_removed.with_label_values(&[&cf]).inc();
373 Decision::Remove
374 } else {
375 metrics.key_kept.with_label_values(&[&cf]).inc();
376 Decision::Keep
377 }
378 }
379 Err(_) => {
380 metrics.key_error.with_label_values(&[&cf]).inc();
381 Decision::Keep
382 }
383 }
384 });
385 db_options
386}
387
388fn compaction_filter_config_by_key<T: DeserializeOwned>(
389 name: &str,
390 metrics: Arc<JsonRpcCompactionMetrics>,
391 db_options: DBOptions,
392 pruner_watermark: Arc<AtomicU64>,
393 extractor: impl Fn(T) -> TxSequenceNumber + Send + Sync + 'static,
394) -> DBOptions {
395 compaction_filter_config(name, metrics, db_options, pruner_watermark, extractor, true)
396}
397
398fn coin_index_table_default_config() -> DBOptions {
399 default_db_options()
400 .optimize_for_write_throughput()
401 .optimize_for_read(
402 read_size_from_env(ENV_VAR_COIN_INDEX_BLOCK_CACHE_SIZE_MB).unwrap_or(5 * 1024),
403 )
404 .disable_write_throttling()
405}
406
407impl IndexStore {
408 pub fn new_without_init(
409 path: PathBuf,
410 registry: &Registry,
411 max_type_length: Option<u64>,
412 remove_deprecated_tables: bool,
413 ) -> Self {
414 let db_options = default_db_options().disable_write_throttling();
415 let pruner_watermark = Arc::new(AtomicU64::new(0));
416 let compaction_metrics = JsonRpcCompactionMetrics::new(registry);
417 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
418 (
419 "transactions_from_addr".to_string(),
420 compaction_filter_config_by_key(
421 "transactions_from_addr",
422 compaction_metrics.clone(),
423 db_options.clone(),
424 pruner_watermark.clone(),
425 |(_, id): (SuiAddress, TxSequenceNumber)| id,
426 ),
427 ),
428 (
429 "transactions_to_addr".to_string(),
430 compaction_filter_config_by_key(
431 "transactions_to_addr",
432 compaction_metrics.clone(),
433 db_options.clone(),
434 pruner_watermark.clone(),
435 |(_, sequence_number): (SuiAddress, TxSequenceNumber)| sequence_number,
436 ),
437 ),
438 (
439 "transactions_by_move_function".to_string(),
440 compaction_filter_config_by_key(
441 "transactions_by_move_function",
442 compaction_metrics.clone(),
443 db_options.clone(),
444 pruner_watermark.clone(),
445 |(_, _, _, id): (ObjectID, String, String, TxSequenceNumber)| id,
446 ),
447 ),
448 (
449 "transaction_order".to_string(),
450 compaction_filter_config_by_key(
451 "transaction_order",
452 compaction_metrics.clone(),
453 db_options.clone(),
454 pruner_watermark.clone(),
455 |sequence_number: TxSequenceNumber| sequence_number,
456 ),
457 ),
458 (
459 "transactions_seq".to_string(),
460 compaction_filter_config(
461 "transactions_seq",
462 compaction_metrics.clone(),
463 db_options.clone(),
464 pruner_watermark.clone(),
465 |sequence_number: TxSequenceNumber| sequence_number,
466 false,
467 ),
468 ),
469 (
470 "coin_index_2".to_string(),
471 coin_index_table_default_config(),
472 ),
473 (
474 "event_order".to_string(),
475 compaction_filter_config_by_key(
476 "event_order",
477 compaction_metrics.clone(),
478 db_options.clone(),
479 pruner_watermark.clone(),
480 |event_id: EventId| event_id.0,
481 ),
482 ),
483 (
484 "event_by_move_module".to_string(),
485 compaction_filter_config_by_key(
486 "event_by_move_module",
487 compaction_metrics.clone(),
488 db_options.clone(),
489 pruner_watermark.clone(),
490 |(_, event_id): (ModuleId, EventId)| event_id.0,
491 ),
492 ),
493 (
494 "event_by_event_module".to_string(),
495 compaction_filter_config_by_key(
496 "event_by_event_module",
497 compaction_metrics.clone(),
498 db_options.clone(),
499 pruner_watermark.clone(),
500 |(_, event_id): (ModuleId, EventId)| event_id.0,
501 ),
502 ),
503 (
504 "event_by_sender".to_string(),
505 compaction_filter_config_by_key(
506 "event_by_sender",
507 compaction_metrics.clone(),
508 db_options.clone(),
509 pruner_watermark.clone(),
510 |(_, event_id): (SuiAddress, EventId)| event_id.0,
511 ),
512 ),
513 (
514 "event_by_time".to_string(),
515 compaction_filter_config_by_key(
516 "event_by_time",
517 compaction_metrics.clone(),
518 db_options.clone(),
519 pruner_watermark.clone(),
520 |(_, event_id): (u64, EventId)| event_id.0,
521 ),
522 ),
523 ]));
524 let tables = IndexStoreTables::open_tables_read_write_with_deprecation_option(
525 path,
526 MetricConf::new("index"),
527 Some(db_options.options),
528 Some(table_options),
529 remove_deprecated_tables,
530 );
531
532 let metrics = IndexStoreMetrics::new(registry);
533 let caches = IndexStoreCaches {
534 per_coin_type_balance: ShardedLruCache::new(1_000_000, 1000),
535 all_balances: ShardedLruCache::new(1_000_000, 1000),
536 locks: MutexTable::new(128),
537 };
538 let next_sequence_number = tables
539 .transaction_order
540 .reversed_safe_iter_with_bounds(None, None)
541 .expect("failed to initialize indexes")
542 .next()
543 .transpose()
544 .expect("failed to initialize indexes")
545 .map(|(seq, _)| seq + 1)
546 .unwrap_or(0)
547 .into();
548 let pruner_watermark_value = tables
549 .pruner_watermark
550 .get(&())
551 .expect("failed to initialize index tables")
552 .unwrap_or(0);
553 pruner_watermark.store(pruner_watermark_value, Ordering::Relaxed);
554
555 Self {
556 tables,
557 next_sequence_number,
558 caches,
559 metrics: Arc::new(metrics),
560 max_type_length: max_type_length.unwrap_or(128),
561 remove_deprecated_tables,
562 pruner_watermark,
563 }
564 }
565
566 pub fn new(
567 path: PathBuf,
568 registry: &Registry,
569 max_type_length: Option<u64>,
570 remove_deprecated_tables: bool,
571 ) -> Self {
572 let mut store =
573 Self::new_without_init(path, registry, max_type_length, remove_deprecated_tables);
574 store.tables.init().unwrap();
575 store
576 }
577
578 pub fn tables(&self) -> &IndexStoreTables {
579 &self.tables
580 }
581
582 #[instrument(skip_all)]
583 pub fn index_coin(
584 &self,
585 digest: &TransactionDigest,
586 batch: &mut DBBatch,
587 object_index_changes: &ObjectIndexChanges,
588 tx_coins: Option<TxCoins>,
589 ) -> SuiResult<IndexStoreCacheUpdates> {
590 if tx_coins.is_none() {
594 return Ok(IndexStoreCacheUpdates::default());
595 }
596 let mut addresses: HashSet<SuiAddress> = HashSet::new();
598 addresses.extend(
599 object_index_changes
600 .deleted_owners
601 .iter()
602 .map(|(owner, _)| *owner),
603 );
604 addresses.extend(
605 object_index_changes
606 .new_owners
607 .iter()
608 .map(|((owner, _), _)| *owner),
609 );
610 let _locks = self.caches.locks.acquire_locks(addresses.into_iter());
611 let mut balance_changes: HashMap<SuiAddress, HashMap<TypeTag, TotalBalance>> =
612 HashMap::new();
613 let (input_coins, written_coins) = tx_coins.unwrap();
615
616 let coin_delete_keys = input_coins
618 .values()
619 .filter_map(|object| {
620 let Owner::AddressOwner(owner) = object.owner() else {
622 return None;
623 };
624
625 let (coin_type, coin) = object
627 .coin_type_maybe()
628 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
629
630 let key = CoinIndexKey2::new(
631 *owner,
632 coin_type.to_string(),
633 coin.balance.value(),
634 object.id(),
635 );
636
637 let map = balance_changes.entry(*owner).or_default();
638 let entry = map.entry(coin_type).or_insert(TotalBalance {
639 num_coins: 0,
640 balance: 0,
641 address_balance: 0,
642 });
643 entry.num_coins -= 1;
644 entry.balance -= coin.balance.value() as i128;
645
646 Some(key)
647 })
648 .collect::<Vec<_>>();
649 trace!(
650 tx_digset=?digest,
651 "coin_delete_keys: {:?}",
652 coin_delete_keys,
653 );
654 batch.delete_batch(&self.tables.coin_index_2, coin_delete_keys)?;
655
656 let coin_add_keys = written_coins
658 .values()
659 .filter_map(|object| {
660 let Owner::AddressOwner(owner) = object.owner() else {
662 return None;
663 };
664
665 let (coin_type, coin) = object
667 .coin_type_maybe()
668 .and_then(|coin_type| object.as_coin_maybe().map(|coin| (coin_type, coin)))?;
669
670 let key = CoinIndexKey2::new(
671 *owner,
672 coin_type.to_string(),
673 coin.balance.value(),
674 object.id(),
675 );
676 let value = CoinInfo {
677 version: object.version(),
678 digest: object.digest(),
679 balance: coin.balance.value(),
680 previous_transaction: object.previous_transaction,
681 };
682 let map = balance_changes.entry(*owner).or_default();
683 let entry = map.entry(coin_type).or_insert(TotalBalance {
684 num_coins: 0,
685 balance: 0,
686 address_balance: 0,
687 });
688 entry.num_coins += 1;
689 entry.balance += coin.balance.value() as i128;
690
691 Some((key, value))
692 })
693 .collect::<Vec<_>>();
694 trace!(
695 tx_digset=?digest,
696 "coin_add_keys: {:?}",
697 coin_add_keys,
698 );
699
700 batch.insert_batch(&self.tables.coin_index_2, coin_add_keys)?;
701
702 let per_coin_type_balance_changes: Vec<_> = balance_changes
703 .iter()
704 .flat_map(|(address, balance_map)| {
705 balance_map.iter().map(|(type_tag, balance)| {
706 (
707 (*address, type_tag.clone()),
708 Ok::<TotalBalance, SuiError>(*balance),
709 )
710 })
711 })
712 .collect();
713 let all_balance_changes: Vec<_> = balance_changes
714 .into_iter()
715 .map(|(address, balance_map)| {
716 (
717 address,
718 Ok::<Arc<HashMap<TypeTag, TotalBalance>>, SuiError>(Arc::new(balance_map)),
719 )
720 })
721 .collect();
722 let cache_updates = IndexStoreCacheUpdates {
723 _locks,
724 per_coin_type_balance_changes,
725 all_balance_changes,
726 };
727 Ok(cache_updates)
728 }
729
730 #[instrument(skip_all)]
731 pub fn index_tx(
732 &self,
733 sender: SuiAddress,
734 active_inputs: impl Iterator<Item = ObjectID>,
735 mutated_objects: impl Iterator<Item = (ObjectRef, Owner)> + Clone,
736 move_functions: impl Iterator<Item = (ObjectID, String, String)> + Clone,
737 events: &TransactionEvents,
738 object_index_changes: ObjectIndexChanges,
739 digest: &TransactionDigest,
740 timestamp_ms: u64,
741 tx_coins: Option<TxCoins>,
742 accumulator_events: Vec<AccumulatorEvent>,
743 ) -> SuiResult<u64> {
744 let sequence = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
745 let mut batch = self.tables.transactions_from_addr.batch();
746
747 batch.insert_batch(
748 &self.tables.transaction_order,
749 std::iter::once((sequence, *digest)),
750 )?;
751
752 batch.insert_batch(
753 &self.tables.transactions_seq,
754 std::iter::once((*digest, sequence)),
755 )?;
756
757 batch.insert_batch(
758 &self.tables.transactions_from_addr,
759 std::iter::once(((sender, sequence), *digest)),
760 )?;
761
762 #[allow(deprecated)]
763 if !self.remove_deprecated_tables {
764 batch.insert_batch(
765 &self.tables.transactions_by_input_object_id,
766 active_inputs.map(|id| ((id, sequence), *digest)),
767 )?;
768
769 batch.insert_batch(
770 &self.tables.transactions_by_mutated_object_id,
771 mutated_objects
772 .clone()
773 .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
774 )?;
775 }
776
777 batch.insert_batch(
778 &self.tables.transactions_by_move_function,
779 move_functions
780 .map(|(obj_id, module, function)| ((obj_id, module, function, sequence), *digest)),
781 )?;
782
783 batch.insert_batch(
784 &self.tables.transactions_to_addr,
785 mutated_objects.filter_map(|(_, owner)| {
786 owner
787 .get_address_owner_address()
788 .ok()
789 .map(|addr| ((addr, sequence), digest))
790 }),
791 )?;
792
793 let cache_updates = self.index_coin(digest, &mut batch, &object_index_changes, tx_coins)?;
795
796 let address_balance_updates = accumulator_events.into_iter().filter_map(|event| {
798 let ty = &event.write.address.ty;
799 let coin_type = sui_types::balance::Balance::maybe_get_balance_type_param(ty)?;
801 Some(((event.write.address.address, coin_type), ()))
802 });
803 batch.insert_batch(&self.tables.address_balances, address_balance_updates)?;
804
805 batch.delete_batch(
807 &self.tables.owner_index,
808 object_index_changes.deleted_owners.into_iter(),
809 )?;
810 batch.delete_batch(
811 &self.tables.dynamic_field_index,
812 object_index_changes.deleted_dynamic_fields.into_iter(),
813 )?;
814
815 batch.insert_batch(
816 &self.tables.owner_index,
817 object_index_changes.new_owners.into_iter(),
818 )?;
819
820 batch.insert_batch(
821 &self.tables.dynamic_field_index,
822 object_index_changes.new_dynamic_fields.into_iter(),
823 )?;
824
825 let event_digest = events.digest();
827 batch.insert_batch(
828 &self.tables.event_order,
829 events
830 .data
831 .iter()
832 .enumerate()
833 .map(|(i, _)| ((sequence, i), (event_digest, *digest, timestamp_ms))),
834 )?;
835 batch.insert_batch(
836 &self.tables.event_by_move_module,
837 events
838 .data
839 .iter()
840 .enumerate()
841 .map(|(i, e)| {
842 (
843 i,
844 ModuleId::new(e.package_id.into(), e.transaction_module.clone()),
845 )
846 })
847 .map(|(i, m)| ((m, (sequence, i)), (event_digest, *digest, timestamp_ms))),
848 )?;
849 batch.insert_batch(
850 &self.tables.event_by_sender,
851 events.data.iter().enumerate().map(|(i, e)| {
852 (
853 (e.sender, (sequence, i)),
854 (event_digest, *digest, timestamp_ms),
855 )
856 }),
857 )?;
858 batch.insert_batch(
859 &self.tables.event_by_move_event,
860 events.data.iter().enumerate().map(|(i, e)| {
861 (
862 (e.type_.clone(), (sequence, i)),
863 (event_digest, *digest, timestamp_ms),
864 )
865 }),
866 )?;
867
868 batch.insert_batch(
869 &self.tables.event_by_time,
870 events.data.iter().enumerate().map(|(i, _)| {
871 (
872 (timestamp_ms, (sequence, i)),
873 (event_digest, *digest, timestamp_ms),
874 )
875 }),
876 )?;
877
878 batch.insert_batch(
879 &self.tables.event_by_event_module,
880 events.data.iter().enumerate().map(|(i, e)| {
881 (
882 (
883 ModuleId::new(e.type_.address, e.type_.module.clone()),
884 (sequence, i),
885 ),
886 (event_digest, *digest, timestamp_ms),
887 )
888 }),
889 )?;
890
891 let invalidate_caches =
892 read_size_from_env(ENV_VAR_INVALIDATE_INSTEAD_OF_UPDATE).unwrap_or(0) > 0;
893
894 if invalidate_caches {
895 self.invalidate_per_coin_type_cache(
897 cache_updates
898 .per_coin_type_balance_changes
899 .iter()
900 .map(|x| x.0.clone()),
901 )?;
902 self.invalidate_all_balance_cache(
903 cache_updates.all_balance_changes.iter().map(|x| x.0),
904 )?;
905 }
906
907 batch.write()?;
908
909 if !invalidate_caches {
910 self.update_per_coin_type_cache(cache_updates.per_coin_type_balance_changes)?;
914 self.update_all_balance_cache(cache_updates.all_balance_changes)?;
915 }
916 Ok(sequence)
917 }
918
919 pub fn next_sequence_number(&self) -> TxSequenceNumber {
920 self.next_sequence_number.load(Ordering::SeqCst) + 1
921 }
922
923 #[instrument(skip(self))]
924 pub fn get_transactions(
925 &self,
926 filter: Option<TransactionFilter>,
927 cursor: Option<TransactionDigest>,
928 limit: Option<usize>,
929 reverse: bool,
930 ) -> SuiResult<Vec<TransactionDigest>> {
931 let cursor = if let Some(cursor) = cursor {
933 Some(
934 self.get_transaction_seq(&cursor)?
935 .ok_or(SuiErrorKind::TransactionNotFound { digest: cursor })?,
936 )
937 } else {
938 None
939 };
940 match filter {
941 Some(TransactionFilter::MoveFunction {
942 package,
943 module,
944 function,
945 }) => Ok(self.get_transactions_by_move_function(
946 package, module, function, cursor, limit, reverse,
947 )?),
948 Some(TransactionFilter::InputObject(object_id)) => {
949 Ok(self.get_transactions_by_input_object(object_id, cursor, limit, reverse)?)
950 }
951 Some(TransactionFilter::ChangedObject(object_id)) => {
952 Ok(self.get_transactions_by_mutated_object(object_id, cursor, limit, reverse)?)
953 }
954 Some(TransactionFilter::FromAddress(address)) => {
955 Ok(self.get_transactions_from_addr(address, cursor, limit, reverse)?)
956 }
957 Some(TransactionFilter::ToAddress(address)) => {
958 Ok(self.get_transactions_to_addr(address, cursor, limit, reverse)?)
959 }
960 Some(_) => Err(SuiErrorKind::UserInputError {
963 error: UserInputError::Unsupported(format!("{:?}", filter)),
964 }
965 .into()),
966 None => {
967 if reverse {
968 let iter = self
969 .tables
970 .transaction_order
971 .reversed_safe_iter_with_bounds(
972 None,
973 Some(cursor.unwrap_or(TxSequenceNumber::MAX)),
974 )?
975 .skip(usize::from(cursor.is_some()))
976 .map(|result| result.map(|(_, digest)| digest));
977 if let Some(limit) = limit {
978 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
979 } else {
980 Ok(iter.collect::<Result<Vec<_>, _>>()?)
981 }
982 } else {
983 let iter = self
984 .tables
985 .transaction_order
986 .safe_iter_with_bounds(Some(cursor.unwrap_or(TxSequenceNumber::MIN)), None)
987 .skip(usize::from(cursor.is_some()))
988 .map(|result| result.map(|(_, digest)| digest));
989 if let Some(limit) = limit {
990 Ok(iter.take(limit).collect::<Result<Vec<_>, _>>()?)
991 } else {
992 Ok(iter.collect::<Result<Vec<_>, _>>()?)
993 }
994 }
995 }
996 }
997 }
998
999 #[instrument(skip_all)]
1000 fn get_transactions_from_index<KeyT: Clone + Serialize + DeserializeOwned + PartialEq>(
1001 index: &DBMap<(KeyT, TxSequenceNumber), TransactionDigest>,
1002 key: KeyT,
1003 cursor: Option<TxSequenceNumber>,
1004 limit: Option<usize>,
1005 reverse: bool,
1006 ) -> SuiResult<Vec<TransactionDigest>> {
1007 Ok(if reverse {
1008 let iter = index
1009 .reversed_safe_iter_with_bounds(
1010 None,
1011 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MAX))),
1012 )?
1013 .skip(usize::from(cursor.is_some()))
1015 .take_while(|result| {
1016 result
1017 .as_ref()
1018 .map(|((id, _), _)| *id == key)
1019 .unwrap_or(false)
1020 })
1021 .map(|result| result.map(|(_, digest)| digest));
1022 if let Some(limit) = limit {
1023 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1024 } else {
1025 iter.collect::<Result<Vec<_>, _>>()?
1026 }
1027 } else {
1028 let iter = index
1029 .safe_iter_with_bounds(
1030 Some((key.clone(), cursor.unwrap_or(TxSequenceNumber::MIN))),
1031 None,
1032 )
1033 .skip(usize::from(cursor.is_some()))
1035 .map(|result| result.expect("iterator db error"))
1036 .take_while(|((id, _), _)| *id == key)
1037 .map(|(_, digest)| digest);
1038 if let Some(limit) = limit {
1039 iter.take(limit).collect()
1040 } else {
1041 iter.collect()
1042 }
1043 })
1044 }
1045
1046 #[instrument(skip(self))]
1047 pub fn get_transactions_by_input_object(
1048 &self,
1049 input_object: ObjectID,
1050 cursor: Option<TxSequenceNumber>,
1051 limit: Option<usize>,
1052 reverse: bool,
1053 ) -> SuiResult<Vec<TransactionDigest>> {
1054 if self.remove_deprecated_tables {
1055 return Ok(vec![]);
1056 }
1057 #[allow(deprecated)]
1058 Self::get_transactions_from_index(
1059 &self.tables.transactions_by_input_object_id,
1060 input_object,
1061 cursor,
1062 limit,
1063 reverse,
1064 )
1065 }
1066
1067 #[instrument(skip(self))]
1068 pub fn get_transactions_by_mutated_object(
1069 &self,
1070 mutated_object: ObjectID,
1071 cursor: Option<TxSequenceNumber>,
1072 limit: Option<usize>,
1073 reverse: bool,
1074 ) -> SuiResult<Vec<TransactionDigest>> {
1075 if self.remove_deprecated_tables {
1076 return Ok(vec![]);
1077 }
1078 #[allow(deprecated)]
1079 Self::get_transactions_from_index(
1080 &self.tables.transactions_by_mutated_object_id,
1081 mutated_object,
1082 cursor,
1083 limit,
1084 reverse,
1085 )
1086 }
1087
1088 #[instrument(skip(self))]
1089 pub fn get_transactions_from_addr(
1090 &self,
1091 addr: SuiAddress,
1092 cursor: Option<TxSequenceNumber>,
1093 limit: Option<usize>,
1094 reverse: bool,
1095 ) -> SuiResult<Vec<TransactionDigest>> {
1096 Self::get_transactions_from_index(
1097 &self.tables.transactions_from_addr,
1098 addr,
1099 cursor,
1100 limit,
1101 reverse,
1102 )
1103 }
1104
1105 #[instrument(skip(self))]
1106 pub fn get_transactions_by_move_function(
1107 &self,
1108 package: ObjectID,
1109 module: Option<String>,
1110 function: Option<String>,
1111 cursor: Option<TxSequenceNumber>,
1112 limit: Option<usize>,
1113 reverse: bool,
1114 ) -> SuiResult<Vec<TransactionDigest>> {
1115 if function.is_some() && module.is_none() {
1117 return Err(SuiErrorKind::UserInputError {
1118 error: UserInputError::MoveFunctionInputError(
1119 "Cannot supply function without supplying module".to_string(),
1120 ),
1121 }
1122 .into());
1123 }
1124
1125 if cursor.is_some() && (module.is_none() || function.is_none()) {
1127 return Err(SuiErrorKind::UserInputError {
1128 error: UserInputError::MoveFunctionInputError(
1129 "Cannot supply cursor without supplying module and function".to_string(),
1130 ),
1131 }
1132 .into());
1133 }
1134
1135 let cursor_val = cursor.unwrap_or(if reverse {
1136 TxSequenceNumber::MAX
1137 } else {
1138 TxSequenceNumber::MIN
1139 });
1140
1141 let max_string = "z".repeat(self.max_type_length.try_into().unwrap());
1142 let module_val = module.clone().unwrap_or(if reverse {
1143 max_string.clone()
1144 } else {
1145 "".to_string()
1146 });
1147
1148 let function_val =
1149 function
1150 .clone()
1151 .unwrap_or(if reverse { max_string } else { "".to_string() });
1152
1153 let key = (package, module_val, function_val, cursor_val);
1154 Ok(if reverse {
1155 let iter = self
1156 .tables
1157 .transactions_by_move_function
1158 .reversed_safe_iter_with_bounds(None, Some(key))?
1159 .skip(usize::from(cursor.is_some()))
1161 .take_while(|result| {
1162 result
1163 .as_ref()
1164 .map(|((id, m, f, _), _)| {
1165 *id == package
1166 && module.as_ref().map(|x| x == m).unwrap_or(true)
1167 && function.as_ref().map(|x| x == f).unwrap_or(true)
1168 })
1169 .unwrap_or(false)
1170 })
1171 .map(|result| result.map(|(_, digest)| digest));
1172 if let Some(limit) = limit {
1173 iter.take(limit).collect::<Result<Vec<_>, _>>()?
1174 } else {
1175 iter.collect::<Result<Vec<_>, _>>()?
1176 }
1177 } else {
1178 let iter = self
1179 .tables
1180 .transactions_by_move_function
1181 .safe_iter_with_bounds(Some(key), None)
1182 .map(|result| result.expect("iterator db error"))
1183 .skip(usize::from(cursor.is_some()))
1185 .take_while(|((id, m, f, _), _)| {
1186 *id == package
1187 && module.as_ref().map(|x| x == m).unwrap_or(true)
1188 && function.as_ref().map(|x| x == f).unwrap_or(true)
1189 })
1190 .map(|(_, digest)| digest);
1191 if let Some(limit) = limit {
1192 iter.take(limit).collect()
1193 } else {
1194 iter.collect()
1195 }
1196 })
1197 }
1198
1199 #[instrument(skip(self))]
1200 pub fn get_transactions_to_addr(
1201 &self,
1202 addr: SuiAddress,
1203 cursor: Option<TxSequenceNumber>,
1204 limit: Option<usize>,
1205 reverse: bool,
1206 ) -> SuiResult<Vec<TransactionDigest>> {
1207 Self::get_transactions_from_index(
1208 &self.tables.transactions_to_addr,
1209 addr,
1210 cursor,
1211 limit,
1212 reverse,
1213 )
1214 }
1215
1216 #[instrument(skip(self))]
1217 pub fn get_transaction_seq(
1218 &self,
1219 digest: &TransactionDigest,
1220 ) -> SuiResult<Option<TxSequenceNumber>> {
1221 Ok(self.tables.transactions_seq.get(digest)?)
1222 }
1223
1224 #[instrument(skip(self))]
1225 pub fn all_events(
1226 &self,
1227 tx_seq: TxSequenceNumber,
1228 event_seq: usize,
1229 limit: usize,
1230 descending: bool,
1231 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1232 Ok(if descending {
1233 self.tables
1234 .event_order
1235 .reversed_safe_iter_with_bounds(None, Some((tx_seq, event_seq)))?
1236 .take(limit)
1237 .map(|result| {
1238 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1239 (digest, tx_digest, event_seq, time)
1240 })
1241 })
1242 .collect::<Result<Vec<_>, _>>()?
1243 } else {
1244 self.tables
1245 .event_order
1246 .safe_iter_with_bounds(Some((tx_seq, event_seq)), None)
1247 .take(limit)
1248 .map(|result| {
1249 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1250 (digest, tx_digest, event_seq, time)
1251 })
1252 })
1253 .collect::<Result<Vec<_>, _>>()?
1254 })
1255 }
1256
1257 #[instrument(skip(self))]
1258 pub fn events_by_transaction(
1259 &self,
1260 digest: &TransactionDigest,
1261 tx_seq: TxSequenceNumber,
1262 event_seq: usize,
1263 limit: usize,
1264 descending: bool,
1265 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1266 let seq = self
1267 .get_transaction_seq(digest)?
1268 .ok_or(SuiErrorKind::TransactionNotFound { digest: *digest })?;
1269 Ok(if descending {
1270 self.tables
1271 .event_order
1272 .reversed_safe_iter_with_bounds(None, Some((min(tx_seq, seq), event_seq)))?
1273 .take_while(|result| {
1274 result
1275 .as_ref()
1276 .map(|((tx, _), _)| tx == &seq)
1277 .unwrap_or(false)
1278 })
1279 .take(limit)
1280 .map(|result| {
1281 result.map(|((_, event_seq), (digest, tx_digest, time))| {
1282 (digest, tx_digest, event_seq, time)
1283 })
1284 })
1285 .collect::<Result<Vec<_>, _>>()?
1286 } else {
1287 self.tables
1288 .event_order
1289 .safe_iter_with_bounds(Some((max(tx_seq, seq), event_seq)), None)
1290 .map(|result| result.expect("iterator db error"))
1291 .take_while(|((tx, _), _)| tx == &seq)
1292 .take(limit)
1293 .map(|((_, event_seq), (digest, tx_digest, time))| {
1294 (digest, tx_digest, event_seq, time)
1295 })
1296 .collect()
1297 })
1298 }
1299
1300 #[instrument(skip_all)]
1301 fn get_event_from_index<KeyT: Clone + PartialEq + Serialize + DeserializeOwned>(
1302 index: &DBMap<(KeyT, EventId), (TransactionEventsDigest, TransactionDigest, u64)>,
1303 key: &KeyT,
1304 tx_seq: TxSequenceNumber,
1305 event_seq: usize,
1306 limit: usize,
1307 descending: bool,
1308 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1309 Ok(if descending {
1310 index
1311 .reversed_safe_iter_with_bounds(None, Some((key.clone(), (tx_seq, event_seq))))?
1312 .take_while(|result| result.as_ref().map(|((m, _), _)| m == key).unwrap_or(false))
1313 .take(limit)
1314 .map(|result| {
1315 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1316 (digest, tx_digest, event_seq, time)
1317 })
1318 })
1319 .collect::<Result<Vec<_>, _>>()?
1320 } else {
1321 index
1322 .safe_iter_with_bounds(Some((key.clone(), (tx_seq, event_seq))), None)
1323 .map(|result| result.expect("iterator db error"))
1324 .take_while(|((m, _), _)| m == key)
1325 .take(limit)
1326 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1327 (digest, tx_digest, event_seq, time)
1328 })
1329 .collect()
1330 })
1331 }
1332
1333 #[instrument(skip(self))]
1334 pub fn events_by_module_id(
1335 &self,
1336 module: &ModuleId,
1337 tx_seq: TxSequenceNumber,
1338 event_seq: usize,
1339 limit: usize,
1340 descending: bool,
1341 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1342 Self::get_event_from_index(
1343 &self.tables.event_by_move_module,
1344 module,
1345 tx_seq,
1346 event_seq,
1347 limit,
1348 descending,
1349 )
1350 }
1351
1352 #[instrument(skip(self))]
1353 pub fn events_by_move_event_struct_name(
1354 &self,
1355 struct_name: &StructTag,
1356 tx_seq: TxSequenceNumber,
1357 event_seq: usize,
1358 limit: usize,
1359 descending: bool,
1360 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1361 Self::get_event_from_index(
1362 &self.tables.event_by_move_event,
1363 struct_name,
1364 tx_seq,
1365 event_seq,
1366 limit,
1367 descending,
1368 )
1369 }
1370
1371 #[instrument(skip(self))]
1372 pub fn events_by_move_event_module(
1373 &self,
1374 module_id: &ModuleId,
1375 tx_seq: TxSequenceNumber,
1376 event_seq: usize,
1377 limit: usize,
1378 descending: bool,
1379 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1380 Self::get_event_from_index(
1381 &self.tables.event_by_event_module,
1382 module_id,
1383 tx_seq,
1384 event_seq,
1385 limit,
1386 descending,
1387 )
1388 }
1389
1390 #[instrument(skip(self))]
1391 pub fn events_by_sender(
1392 &self,
1393 sender: &SuiAddress,
1394 tx_seq: TxSequenceNumber,
1395 event_seq: usize,
1396 limit: usize,
1397 descending: bool,
1398 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1399 Self::get_event_from_index(
1400 &self.tables.event_by_sender,
1401 sender,
1402 tx_seq,
1403 event_seq,
1404 limit,
1405 descending,
1406 )
1407 }
1408
1409 #[instrument(skip(self))]
1410 pub fn event_iterator(
1411 &self,
1412 start_time: u64,
1413 end_time: u64,
1414 tx_seq: TxSequenceNumber,
1415 event_seq: usize,
1416 limit: usize,
1417 descending: bool,
1418 ) -> SuiResult<Vec<(TransactionEventsDigest, TransactionDigest, usize, u64)>> {
1419 Ok(if descending {
1420 self.tables
1421 .event_by_time
1422 .reversed_safe_iter_with_bounds(None, Some((end_time, (tx_seq, event_seq))))?
1423 .take_while(|result| {
1424 result
1425 .as_ref()
1426 .map(|((m, _), _)| m >= &start_time)
1427 .unwrap_or(false)
1428 })
1429 .take(limit)
1430 .map(|result| {
1431 result.map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1432 (digest, tx_digest, event_seq, time)
1433 })
1434 })
1435 .collect::<Result<Vec<_>, _>>()?
1436 } else {
1437 self.tables
1438 .event_by_time
1439 .safe_iter_with_bounds(Some((start_time, (tx_seq, event_seq))), None)
1440 .map(|result| result.expect("iterator db error"))
1441 .take_while(|((m, _), _)| m <= &end_time)
1442 .take(limit)
1443 .map(|((_, (_, event_seq)), (digest, tx_digest, time))| {
1444 (digest, tx_digest, event_seq, time)
1445 })
1446 .collect()
1447 })
1448 }
1449
1450 pub fn prune(&self, cut_time_ms: u64) -> SuiResult<TxSequenceNumber> {
1451 match self
1452 .tables
1453 .event_by_time
1454 .reversed_safe_iter_with_bounds(
1455 None,
1456 Some((cut_time_ms, (TxSequenceNumber::MAX, usize::MAX))),
1457 )?
1458 .next()
1459 .transpose()?
1460 {
1461 Some(((_, (watermark, _)), _)) => {
1462 if let Some(digest) = self.tables.transaction_order.get(&watermark)? {
1463 info!(
1464 "json rpc index pruning. Watermark is {} with digest {}",
1465 watermark, digest
1466 );
1467 }
1468 self.pruner_watermark.store(watermark, Ordering::Relaxed);
1469 self.tables.pruner_watermark.insert(&(), &watermark)?;
1470 Ok(watermark)
1471 }
1472 None => Ok(0),
1473 }
1474 }
1475
1476 pub fn get_dynamic_fields_iterator(
1477 &self,
1478 object: ObjectID,
1479 cursor: Option<ObjectID>,
1480 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
1481 {
1482 Ok(self.tables.get_dynamic_fields_iterator(object, cursor))
1483 }
1484
1485 #[instrument(skip(self))]
1486 pub fn get_dynamic_field_object_id(
1487 &self,
1488 object: ObjectID,
1489 name_type: TypeTag,
1490 name_bcs_bytes: &[u8],
1491 ) -> SuiResult<Option<ObjectID>> {
1492 debug!(?object, "get_dynamic_field_object_id");
1493 let dynamic_field_id =
1494 dynamic_field::derive_dynamic_field_id(object, &name_type, name_bcs_bytes).map_err(
1495 |e| {
1496 SuiErrorKind::Unknown(format!(
1497 "Unable to generate dynamic field id. Got error: {e:?}"
1498 ))
1499 },
1500 )?;
1501
1502 if let Some(info) = self
1503 .tables
1504 .dynamic_field_index
1505 .get(&(object, dynamic_field_id))?
1506 {
1507 debug_assert!(
1509 info.object_id == dynamic_field_id
1510 || matches!(name_type, TypeTag::Struct(tag) if DynamicFieldInfo::is_dynamic_object_field_wrapper(&tag))
1511 );
1512 return Ok(Some(info.object_id));
1513 }
1514
1515 let dynamic_object_field_struct = DynamicFieldInfo::dynamic_object_field_wrapper(name_type);
1516 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
1517 let dynamic_object_field_id = dynamic_field::derive_dynamic_field_id(
1518 object,
1519 &dynamic_object_field_type,
1520 name_bcs_bytes,
1521 )
1522 .map_err(|e| {
1523 SuiErrorKind::Unknown(format!(
1524 "Unable to generate dynamic field id. Got error: {e:?}"
1525 ))
1526 })?;
1527 if let Some(info) = self
1528 .tables
1529 .dynamic_field_index
1530 .get(&(object, dynamic_object_field_id))?
1531 {
1532 return Ok(Some(info.object_id));
1533 }
1534
1535 Ok(None)
1536 }
1537
1538 #[instrument(skip(self))]
1539 pub fn get_owner_objects(
1540 &self,
1541 owner: SuiAddress,
1542 cursor: Option<ObjectID>,
1543 limit: usize,
1544 filter: Option<SuiObjectDataFilter>,
1545 ) -> SuiResult<Vec<ObjectInfo>> {
1546 let cursor = match cursor {
1547 Some(cursor) => cursor,
1548 None => ObjectID::ZERO,
1549 };
1550 Ok(self
1551 .get_owner_objects_iterator(owner, cursor, filter)?
1552 .take(limit)
1553 .collect())
1554 }
1555
1556 pub fn get_address_balance_coin_types_iter(
1557 &self,
1558 owner: SuiAddress,
1559 ) -> impl Iterator<Item = TypeTag> {
1560 let start_key = (owner, TypeTag::Bool);
1561 self.tables()
1562 .address_balances
1563 .safe_iter_with_bounds(Some(start_key), None)
1564 .map(|result| result.expect("iterator db error"))
1565 .take_while(move |(key, _)| key.0 == owner)
1566 .map(|(key, _)| key.1)
1567 }
1568
1569 pub fn get_owned_coins_iterator(
1570 coin_index: &DBMap<CoinIndexKey2, CoinInfo>,
1571 owner: SuiAddress,
1572 coin_type_tag: Option<String>,
1573 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1574 let all_coins = coin_type_tag.is_none();
1575 let starting_coin_type =
1576 coin_type_tag.unwrap_or_else(|| String::from_utf8([0u8].to_vec()).unwrap());
1577 let start_key =
1578 CoinIndexKey2::new(owner, starting_coin_type.clone(), u64::MAX, ObjectID::ZERO);
1579 Ok(coin_index
1580 .safe_iter_with_bounds(Some(start_key), None)
1581 .map(|result| result.expect("iterator db error"))
1582 .take_while(move |(key, _)| {
1583 if key.owner != owner {
1584 return false;
1585 }
1586 if !all_coins && starting_coin_type != key.coin_type {
1587 return false;
1588 }
1589 true
1590 }))
1591 }
1592
1593 pub fn get_owned_coins_iterator_with_cursor(
1594 &self,
1595 owner: SuiAddress,
1596 cursor: (String, u64, ObjectID),
1597 limit: usize,
1598 one_coin_type_only: bool,
1599 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
1600 let (starting_coin_type, inverted_balance, starting_object_id) = cursor;
1601 let start_key = CoinIndexKey2::new_from_cursor(
1602 owner,
1603 starting_coin_type.clone(),
1604 inverted_balance,
1605 starting_object_id,
1606 );
1607 Ok(self
1608 .tables
1609 .coin_index_2
1610 .safe_iter_with_bounds(Some(start_key), None)
1611 .map(|result| result.expect("iterator db error"))
1612 .filter(move |(key, _)| key.object_id != starting_object_id)
1613 .enumerate()
1614 .take_while(move |(index, (key, _))| {
1615 if *index >= limit {
1616 return false;
1617 }
1618 if key.owner != owner {
1619 return false;
1620 }
1621 if one_coin_type_only && starting_coin_type != key.coin_type {
1622 return false;
1623 }
1624 true
1625 })
1626 .map(|(_index, (key, info))| (key, info)))
1627 }
1628
1629 pub fn get_owner_objects_iterator(
1632 &self,
1633 owner: SuiAddress,
1634 starting_object_id: ObjectID,
1635 filter: Option<SuiObjectDataFilter>,
1636 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
1637 Ok(self
1638 .tables
1639 .owner_index
1640 .safe_iter_with_bounds(Some((owner, starting_object_id)), None)
1642 .map(|result| result.expect("iterator db error"))
1643 .skip(usize::from(starting_object_id != ObjectID::ZERO))
1644 .take_while(move |((address_owner, _), _)| address_owner == &owner)
1645 .filter(move |(_, o)| {
1646 if let Some(filter) = filter.as_ref() {
1647 filter.matches(o)
1648 } else {
1649 true
1650 }
1651 })
1652 .map(|(_, object_info)| object_info))
1653 }
1654
1655 pub fn insert_genesis_objects(&self, object_index_changes: ObjectIndexChanges) -> SuiResult {
1656 let mut batch = self.tables.owner_index.batch();
1657 batch.insert_batch(
1658 &self.tables.owner_index,
1659 object_index_changes.new_owners.into_iter(),
1660 )?;
1661 batch.insert_batch(
1662 &self.tables.dynamic_field_index,
1663 object_index_changes.new_dynamic_fields.into_iter(),
1664 )?;
1665 batch.write()?;
1666 Ok(())
1667 }
1668
1669 pub fn is_empty(&self) -> bool {
1670 self.tables.owner_index.is_empty()
1671 }
1672
1673 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1674 self.tables
1676 .transactions_from_addr
1677 .checkpoint_db(path)
1678 .map_err(Into::into)
1679 }
1680
1681 #[instrument(skip(self))]
1686 pub fn get_coin_object_balance(
1687 &self,
1688 owner: SuiAddress,
1689 coin_type: TypeTag,
1690 ) -> SuiResult<TotalBalance> {
1691 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1692 let cloned_coin_type = coin_type.clone();
1693 let metrics_cloned = self.metrics.clone();
1694 let coin_index_cloned = self.tables.coin_index_2.clone();
1695 if force_disable_cache {
1696 return Self::get_balance_from_db(
1697 metrics_cloned,
1698 coin_index_cloned,
1699 owner,
1700 cloned_coin_type,
1701 )
1702 .map_err(|e| {
1703 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1704 .into()
1705 });
1706 }
1707
1708 self.metrics.balance_lookup_from_total.inc();
1709
1710 let balance = self
1711 .caches
1712 .per_coin_type_balance
1713 .get(&(owner, coin_type.clone()));
1714 if let Some(balance) = balance {
1715 return balance;
1716 }
1717 let all_balance = self.caches.all_balances.get(&owner.clone());
1719 if let Some(Ok(all_balance)) = all_balance
1720 && let Some(balance) = all_balance.get(&coin_type)
1721 {
1722 return Ok(*balance);
1723 }
1724 let cloned_coin_type = coin_type.clone();
1725 let metrics_cloned = self.metrics.clone();
1726 let coin_index_cloned = self.tables.coin_index_2.clone();
1727 self.caches
1728 .per_coin_type_balance
1729 .get_with((owner, coin_type), move || {
1730 Self::get_balance_from_db(
1731 metrics_cloned,
1732 coin_index_cloned,
1733 owner,
1734 cloned_coin_type,
1735 )
1736 .map_err(|e| {
1737 SuiErrorKind::ExecutionError(format!("Failed to read balance frm DB: {:?}", e))
1738 .into()
1739 })
1740 })
1741 }
1742
1743 #[instrument(skip(self))]
1749 pub fn get_all_coin_object_balances(
1750 &self,
1751 owner: SuiAddress,
1752 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1753 let force_disable_cache = read_size_from_env(ENV_VAR_DISABLE_INDEX_CACHE).unwrap_or(0) > 0;
1754 let metrics_cloned = self.metrics.clone();
1755 let coin_index_cloned = self.tables.coin_index_2.clone();
1756 if force_disable_cache {
1757 return Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner)
1758 .map_err(|e| {
1759 SuiErrorKind::ExecutionError(format!(
1760 "Failed to read all balance from DB: {:?}",
1761 e
1762 ))
1763 .into()
1764 });
1765 }
1766
1767 self.metrics.all_balance_lookup_from_total.inc();
1768 let metrics_cloned = self.metrics.clone();
1769 let coin_index_cloned = self.tables.coin_index_2.clone();
1770 self.caches.all_balances.get_with(owner, move || {
1771 Self::get_all_balances_from_db(metrics_cloned, coin_index_cloned, owner).map_err(|e| {
1772 SuiErrorKind::ExecutionError(format!("Failed to read all balance from DB: {:?}", e))
1773 .into()
1774 })
1775 })
1776 }
1777
1778 #[instrument(skip_all)]
1780 pub fn get_balance_from_db(
1781 metrics: Arc<IndexStoreMetrics>,
1782 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1783 owner: SuiAddress,
1784 coin_type: TypeTag,
1785 ) -> SuiResult<TotalBalance> {
1786 metrics.balance_lookup_from_db.inc();
1787 let coin_type_str = coin_type.to_string();
1788 let coins =
1789 Self::get_owned_coins_iterator(&coin_index, owner, Some(coin_type_str.clone()))?;
1790
1791 let mut balance = 0i128;
1792 let mut num_coins = 0;
1793 for (_key, coin_info) in coins {
1794 balance += coin_info.balance as i128;
1795 num_coins += 1;
1796 }
1797 Ok(TotalBalance {
1798 balance,
1799 num_coins,
1800 address_balance: 0,
1801 })
1802 }
1803
1804 #[instrument(skip_all)]
1806 pub fn get_all_balances_from_db(
1807 metrics: Arc<IndexStoreMetrics>,
1808 coin_index: DBMap<CoinIndexKey2, CoinInfo>,
1809 owner: SuiAddress,
1810 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1811 metrics.all_balance_lookup_from_db.inc();
1812 let mut balances: HashMap<TypeTag, TotalBalance> = HashMap::new();
1813 let coins = Self::get_owned_coins_iterator(&coin_index, owner, None)?
1814 .chunk_by(|(key, _coin)| key.coin_type.clone());
1815 for (coin_type, coins) in &coins {
1816 let mut total_balance = 0i128;
1817 let mut coin_object_count = 0;
1818 for (_, coin_info) in coins {
1819 total_balance += coin_info.balance as i128;
1820 coin_object_count += 1;
1821 }
1822 let coin_type =
1823 TypeTag::Struct(Box::new(parse_sui_struct_tag(&coin_type).map_err(|e| {
1824 SuiErrorKind::ExecutionError(format!(
1825 "Failed to parse event sender address: {:?}",
1826 e
1827 ))
1828 })?));
1829 balances.insert(
1830 coin_type,
1831 TotalBalance {
1832 num_coins: coin_object_count,
1833 balance: total_balance,
1834 address_balance: 0,
1835 },
1836 );
1837 }
1838 Ok(Arc::new(balances))
1839 }
1840
1841 fn invalidate_per_coin_type_cache(
1842 &self,
1843 keys: impl IntoIterator<Item = (SuiAddress, TypeTag)>,
1844 ) -> SuiResult {
1845 self.caches.per_coin_type_balance.batch_invalidate(keys);
1846 Ok(())
1847 }
1848
1849 fn invalidate_all_balance_cache(
1850 &self,
1851 addresses: impl IntoIterator<Item = SuiAddress>,
1852 ) -> SuiResult {
1853 self.caches.all_balances.batch_invalidate(addresses);
1854 Ok(())
1855 }
1856
1857 fn update_per_coin_type_cache(
1858 &self,
1859 keys: impl IntoIterator<Item = ((SuiAddress, TypeTag), SuiResult<TotalBalance>)>,
1860 ) -> SuiResult {
1861 self.caches
1862 .per_coin_type_balance
1863 .batch_merge(keys, Self::merge_balance);
1864 Ok(())
1865 }
1866
1867 fn merge_balance(
1868 old_balance: &SuiResult<TotalBalance>,
1869 balance_delta: &SuiResult<TotalBalance>,
1870 ) -> SuiResult<TotalBalance> {
1871 if let Ok(old_balance) = old_balance {
1872 if let Ok(balance_delta) = balance_delta {
1873 Ok(TotalBalance {
1874 balance: old_balance.balance + balance_delta.balance,
1875 num_coins: old_balance.num_coins + balance_delta.num_coins,
1876 address_balance: old_balance.address_balance,
1877 })
1878 } else {
1879 balance_delta.clone()
1880 }
1881 } else {
1882 old_balance.clone()
1883 }
1884 }
1885
1886 fn update_all_balance_cache(
1887 &self,
1888 keys: impl IntoIterator<Item = (SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>)>,
1889 ) -> SuiResult {
1890 self.caches
1891 .all_balances
1892 .batch_merge(keys, Self::merge_all_balance);
1893 Ok(())
1894 }
1895
1896 fn merge_all_balance(
1897 old_balance: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1898 balance_delta: &SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>,
1899 ) -> SuiResult<Arc<HashMap<TypeTag, TotalBalance>>> {
1900 if let Ok(old_balance) = old_balance {
1901 if let Ok(balance_delta) = balance_delta {
1902 let mut new_balance = HashMap::new();
1903 for (key, value) in old_balance.iter() {
1904 new_balance.insert(key.clone(), *value);
1905 }
1906 for (key, delta) in balance_delta.iter() {
1907 let old = new_balance.entry(key.clone()).or_insert(TotalBalance {
1908 balance: 0,
1909 num_coins: 0,
1910 address_balance: 0,
1911 });
1912 let new_total = TotalBalance {
1913 balance: old.balance + delta.balance,
1914 num_coins: old.num_coins + delta.num_coins,
1915 address_balance: old.address_balance,
1916 };
1917 new_balance.insert(key.clone(), new_total);
1918 }
1919 Ok(Arc::new(new_balance))
1920 } else {
1921 balance_delta.clone()
1922 }
1923 } else {
1924 old_balance.clone()
1925 }
1926 }
1927}
1928
1929#[cfg(test)]
1930mod tests {
1931 use super::IndexStore;
1932 use super::ObjectIndexChanges;
1933 use move_core_types::account_address::AccountAddress;
1934 use prometheus::Registry;
1935 use std::collections::BTreeMap;
1936 use std::env::temp_dir;
1937 use sui_types::base_types::{ObjectInfo, ObjectType, SuiAddress};
1938 use sui_types::digests::TransactionDigest;
1939 use sui_types::effects::TransactionEvents;
1940 use sui_types::gas_coin::GAS;
1941 use sui_types::object;
1942 use sui_types::object::Owner;
1943
1944 #[tokio::test]
1945 async fn test_index_cache() -> anyhow::Result<()> {
1946 let index_store =
1954 IndexStore::new_without_init(temp_dir(), &Registry::default(), Some(128), false);
1955 let address: SuiAddress = AccountAddress::random().into();
1956 let mut written_objects = BTreeMap::new();
1957 let mut input_objects = BTreeMap::new();
1958 let mut object_map = BTreeMap::new();
1959
1960 let mut new_objects = vec![];
1961 for _i in 0..10 {
1962 let object = object::Object::new_gas_with_balance_and_owner_for_testing(100, address);
1963 new_objects.push((
1964 (address, object.id()),
1965 ObjectInfo {
1966 object_id: object.id(),
1967 version: object.version(),
1968 digest: object.digest(),
1969 type_: ObjectType::Struct(object.type_().unwrap().clone()),
1970 owner: Owner::AddressOwner(address),
1971 previous_transaction: object.previous_transaction,
1972 },
1973 ));
1974 object_map.insert(object.id(), object.clone());
1975 written_objects.insert(object.data.id(), object);
1976 }
1977 let object_index_changes = ObjectIndexChanges {
1978 deleted_owners: vec![],
1979 deleted_dynamic_fields: vec![],
1980 new_owners: new_objects,
1981 new_dynamic_fields: vec![],
1982 };
1983
1984 let tx_coins = (input_objects.clone(), written_objects.clone());
1985 index_store.index_tx(
1986 address,
1987 vec![].into_iter(),
1988 vec![].into_iter(),
1989 vec![].into_iter(),
1990 &TransactionEvents { data: vec![] },
1991 object_index_changes,
1992 &TransactionDigest::random(),
1993 1234,
1994 Some(tx_coins),
1995 vec![],
1996 )?;
1997
1998 let balance_from_db = IndexStore::get_balance_from_db(
1999 index_store.metrics.clone(),
2000 index_store.tables.coin_index_2.clone(),
2001 address,
2002 GAS::type_tag(),
2003 )?;
2004 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2005 assert_eq!(balance, balance_from_db);
2006 assert_eq!(balance.balance, 1000);
2007 assert_eq!(balance.num_coins, 10);
2008
2009 let all_balance = index_store.get_all_coin_object_balances(address)?;
2010 let balance = all_balance.get(&GAS::type_tag()).unwrap();
2011 assert_eq!(*balance, balance_from_db);
2012 assert_eq!(balance.balance, 1000);
2013 assert_eq!(balance.num_coins, 10);
2014
2015 written_objects.clear();
2016 let mut deleted_objects = vec![];
2017 for (id, object) in object_map.iter().take(3) {
2018 deleted_objects.push((address, *id));
2019 input_objects.insert(*id, object.to_owned());
2020 }
2021 let object_index_changes = ObjectIndexChanges {
2022 deleted_owners: deleted_objects.clone(),
2023 deleted_dynamic_fields: vec![],
2024 new_owners: vec![],
2025 new_dynamic_fields: vec![],
2026 };
2027 let tx_coins = (input_objects, written_objects);
2028 index_store.index_tx(
2029 address,
2030 vec![].into_iter(),
2031 vec![].into_iter(),
2032 vec![].into_iter(),
2033 &TransactionEvents { data: vec![] },
2034 object_index_changes,
2035 &TransactionDigest::random(),
2036 1234,
2037 Some(tx_coins),
2038 vec![],
2039 )?;
2040 let balance_from_db = IndexStore::get_balance_from_db(
2041 index_store.metrics.clone(),
2042 index_store.tables.coin_index_2.clone(),
2043 address,
2044 GAS::type_tag(),
2045 )?;
2046 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2047 assert_eq!(balance, balance_from_db);
2048 assert_eq!(balance.balance, 700);
2049 assert_eq!(balance.num_coins, 7);
2050 index_store
2053 .caches
2054 .per_coin_type_balance
2055 .invalidate(&(address, GAS::type_tag()));
2056 let all_balance = index_store.get_all_coin_object_balances(address)?;
2057 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().balance, 700);
2058 assert_eq!(all_balance.get(&GAS::type_tag()).unwrap().num_coins, 7);
2059 let balance = index_store.get_coin_object_balance(address, GAS::type_tag())?;
2060 assert_eq!(balance, balance_from_db);
2061 assert_eq!(balance.balance, 700);
2062 assert_eq!(balance.num_coins, 7);
2063
2064 Ok(())
2065 }
2066
2067 #[tokio::test]
2068 async fn test_get_transaction_by_move_function() {
2069 use sui_types::base_types::ObjectID;
2070 use typed_store::Map;
2071
2072 let index_store = IndexStore::new(temp_dir(), &Registry::default(), Some(128), false);
2073 let db = &index_store.tables.transactions_by_move_function;
2074 db.insert(
2075 &(
2076 ObjectID::new([1; 32]),
2077 "mod".to_string(),
2078 "f".to_string(),
2079 0,
2080 ),
2081 &[0; 32].into(),
2082 )
2083 .unwrap();
2084 db.insert(
2085 &(
2086 ObjectID::new([1; 32]),
2087 "mod".to_string(),
2088 "Z".repeat(128),
2089 0,
2090 ),
2091 &[1; 32].into(),
2092 )
2093 .unwrap();
2094 db.insert(
2095 &(
2096 ObjectID::new([1; 32]),
2097 "mod".to_string(),
2098 "f".repeat(128),
2099 0,
2100 ),
2101 &[2; 32].into(),
2102 )
2103 .unwrap();
2104 db.insert(
2105 &(
2106 ObjectID::new([1; 32]),
2107 "mod".to_string(),
2108 "z".repeat(128),
2109 0,
2110 ),
2111 &[3; 32].into(),
2112 )
2113 .unwrap();
2114
2115 let mut v = index_store
2116 .get_transactions_by_move_function(
2117 ObjectID::new([1; 32]),
2118 Some("mod".to_string()),
2119 None,
2120 None,
2121 None,
2122 false,
2123 )
2124 .unwrap();
2125 let v_rev = index_store
2126 .get_transactions_by_move_function(
2127 ObjectID::new([1; 32]),
2128 Some("mod".to_string()),
2129 None,
2130 None,
2131 None,
2132 true,
2133 )
2134 .unwrap();
2135 v.reverse();
2136 assert_eq!(v, v_rev);
2137 }
2138}