sui_indexer/handlers/
tx_processor.rs1use std::collections::HashMap;
5
6use async_trait::async_trait;
7use sui_json_rpc::ObjectProvider;
8use sui_json_rpc::get_balance_changes_from_effect;
9use sui_json_rpc::get_object_changes;
10use sui_types::base_types::ObjectID;
11use sui_types::base_types::SequenceNumber;
12use sui_types::digests::TransactionDigest;
13use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
14use sui_types::full_checkpoint_content::CheckpointData;
15use sui_types::object::Object;
16use sui_types::transaction::{TransactionData, TransactionDataAPI};
17
18use crate::errors::IndexerError;
19use crate::metrics::IndexerMetrics;
20use crate::types::{IndexedObjectChange, IndexerResult};
21
22pub struct InMemObjectCache {
23 id_map: HashMap<ObjectID, Object>,
24 seq_map: HashMap<(ObjectID, SequenceNumber), Object>,
25}
26
27impl InMemObjectCache {
28 pub fn new() -> Self {
29 Self {
30 id_map: HashMap::new(),
31 seq_map: HashMap::new(),
32 }
33 }
34
35 pub fn insert_object(&mut self, obj: Object) {
36 self.id_map.insert(obj.id(), obj.clone());
37 self.seq_map.insert((obj.id(), obj.version()), obj);
38 }
39
40 pub fn get(&self, id: &ObjectID, version: Option<&SequenceNumber>) -> Option<&Object> {
41 if let Some(version) = version {
42 self.seq_map.get(&(*id, *version))
43 } else {
44 self.id_map.get(id)
45 }
46 }
47}
48
49impl Default for InMemObjectCache {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55pub struct TxChangesProcessor {
59 object_cache: InMemObjectCache,
60 metrics: IndexerMetrics,
61}
62
63impl TxChangesProcessor {
64 pub fn new(objects: &[&Object], metrics: IndexerMetrics) -> Self {
65 let mut object_cache = InMemObjectCache::new();
66 for obj in objects {
67 object_cache.insert_object(<&Object>::clone(obj).clone());
68 }
69 Self {
70 object_cache,
71 metrics,
72 }
73 }
74
75 pub(crate) async fn get_changes(
76 &self,
77 tx: &TransactionData,
78 effects: &TransactionEffects,
79 tx_digest: &TransactionDigest,
80 ) -> IndexerResult<(
81 Vec<sui_json_rpc_types::BalanceChange>,
82 Vec<IndexedObjectChange>,
83 )> {
84 let _timer = self
85 .metrics
86 .indexing_tx_object_changes_latency
87 .start_timer();
88 let object_change: Vec<_> = get_object_changes(
89 self,
90 effects,
91 tx.sender(),
92 effects.modified_at_versions(),
93 effects.all_changed_objects(),
94 effects.all_removed_objects(),
95 )
96 .await?
97 .into_iter()
98 .map(IndexedObjectChange::from)
99 .collect();
100 let balance_change = get_balance_changes_from_effect(
101 self,
102 effects,
103 tx.input_objects().unwrap_or_else(|e| {
104 panic!(
105 "Checkpointed tx {:?} has invalid input objects: {e}",
106 tx_digest,
107 )
108 }),
109 None,
110 )
111 .await?;
112 Ok((balance_change, object_change))
113 }
114}
115
116#[async_trait]
117impl ObjectProvider for TxChangesProcessor {
118 type Error = IndexerError;
119
120 async fn get_object(
121 &self,
122 id: &ObjectID,
123 version: &SequenceNumber,
124 ) -> Result<Object, Self::Error> {
125 let object = self
126 .object_cache
127 .get(id, Some(version))
128 .as_ref()
129 .map(|o| <&Object>::clone(o).clone());
130 if let Some(o) = object {
131 self.metrics.indexing_get_object_in_mem_hit.inc();
132 return Ok(o);
133 }
134
135 panic!(
136 "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn get_object)",
137 id
138 );
139 }
140
141 async fn find_object_lt_or_eq_version(
142 &self,
143 id: &ObjectID,
144 version: &SequenceNumber,
145 ) -> Result<Option<Object>, Self::Error> {
146 let object = self
148 .object_cache
149 .get(id, Some(version))
150 .as_ref()
151 .map(|o| <&Object>::clone(o).clone());
152 if let Some(o) = object {
153 self.metrics.indexing_get_object_in_mem_hit.inc();
154 return Ok(Some(o));
155 }
156
157 let object = self
161 .object_cache
162 .get(id, None)
163 .as_ref()
164 .map(|o| <&Object>::clone(o).clone());
165 if let Some(o) = object {
166 if o.version() > *version {
167 panic!(
168 "Found a higher version {} for object {}, expected lt_or_eq {}",
169 o.version(),
170 id,
171 *version
172 );
173 }
174 if o.version() <= *version {
175 self.metrics.indexing_get_object_in_mem_hit.inc();
176 return Ok(Some(o));
177 }
178 }
179
180 panic!(
181 "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn find_object_lt_or_eq_version)",
182 id
183 );
184 }
185}
186
187pub(crate) struct EpochEndIndexingObjectStore<'a> {
190 objects: Vec<&'a Object>,
191}
192
193impl<'a> EpochEndIndexingObjectStore<'a> {
194 pub fn new(data: &'a CheckpointData) -> Self {
195 Self {
196 objects: data.latest_live_output_objects(),
197 }
198 }
199}
200
201impl sui_types::storage::ObjectStore for EpochEndIndexingObjectStore<'_> {
202 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
203 self.objects
204 .iter()
205 .find(|o| o.id() == *object_id)
206 .cloned()
207 .cloned()
208 }
209
210 fn get_object_by_key(
211 &self,
212 object_id: &ObjectID,
213 version: sui_types::base_types::VersionNumber,
214 ) -> Option<Object> {
215 self.objects
216 .iter()
217 .find(|o| o.id() == *object_id && o.version() == version)
218 .cloned()
219 .cloned()
220 }
221}