sui_indexer/handlers/
tx_processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5
6use async_trait::async_trait;
7use sui_json_rpc::ObjectProvider;
8use sui_json_rpc::get_balance_changes_from_effect;
9use sui_json_rpc::get_object_changes;
10use sui_types::base_types::ObjectID;
11use sui_types::base_types::SequenceNumber;
12use sui_types::digests::TransactionDigest;
13use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
14use sui_types::full_checkpoint_content::CheckpointData;
15use sui_types::object::Object;
16use sui_types::transaction::{TransactionData, TransactionDataAPI};
17
18use crate::errors::IndexerError;
19use crate::metrics::IndexerMetrics;
20use crate::types::{IndexedObjectChange, IndexerResult};
21
22pub struct InMemObjectCache {
23    id_map: HashMap<ObjectID, Object>,
24    seq_map: HashMap<(ObjectID, SequenceNumber), Object>,
25}
26
27impl InMemObjectCache {
28    pub fn new() -> Self {
29        Self {
30            id_map: HashMap::new(),
31            seq_map: HashMap::new(),
32        }
33    }
34
35    pub fn insert_object(&mut self, obj: Object) {
36        self.id_map.insert(obj.id(), obj.clone());
37        self.seq_map.insert((obj.id(), obj.version()), obj);
38    }
39
40    pub fn get(&self, id: &ObjectID, version: Option<&SequenceNumber>) -> Option<&Object> {
41        if let Some(version) = version {
42            self.seq_map.get(&(*id, *version))
43        } else {
44            self.id_map.get(id)
45        }
46    }
47}
48
49impl Default for InMemObjectCache {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55/// Along with InMemObjectCache, TxChangesProcessor implements ObjectProvider
56/// so it can be used in indexing write path to get object/balance changes.
57/// Its lifetime is per checkpoint.
58pub struct TxChangesProcessor {
59    object_cache: InMemObjectCache,
60    metrics: IndexerMetrics,
61}
62
63impl TxChangesProcessor {
64    pub fn new(objects: &[&Object], metrics: IndexerMetrics) -> Self {
65        let mut object_cache = InMemObjectCache::new();
66        for obj in objects {
67            object_cache.insert_object(<&Object>::clone(obj).clone());
68        }
69        Self {
70            object_cache,
71            metrics,
72        }
73    }
74
75    pub(crate) async fn get_changes(
76        &self,
77        tx: &TransactionData,
78        effects: &TransactionEffects,
79        tx_digest: &TransactionDigest,
80    ) -> IndexerResult<(
81        Vec<sui_json_rpc_types::BalanceChange>,
82        Vec<IndexedObjectChange>,
83    )> {
84        let _timer = self
85            .metrics
86            .indexing_tx_object_changes_latency
87            .start_timer();
88        let object_change: Vec<_> = get_object_changes(
89            self,
90            effects,
91            tx.sender(),
92            effects.modified_at_versions(),
93            effects.all_changed_objects(),
94            effects.all_removed_objects(),
95        )
96        .await?
97        .into_iter()
98        .map(IndexedObjectChange::from)
99        .collect();
100        let balance_change = get_balance_changes_from_effect(
101            self,
102            effects,
103            tx.input_objects().unwrap_or_else(|e| {
104                panic!(
105                    "Checkpointed tx {:?} has invalid input objects: {e}",
106                    tx_digest,
107                )
108            }),
109            None,
110        )
111        .await?;
112        Ok((balance_change, object_change))
113    }
114}
115
116#[async_trait]
117impl ObjectProvider for TxChangesProcessor {
118    type Error = IndexerError;
119
120    async fn get_object(
121        &self,
122        id: &ObjectID,
123        version: &SequenceNumber,
124    ) -> Result<Object, Self::Error> {
125        let object = self
126            .object_cache
127            .get(id, Some(version))
128            .as_ref()
129            .map(|o| <&Object>::clone(o).clone());
130        if let Some(o) = object {
131            self.metrics.indexing_get_object_in_mem_hit.inc();
132            return Ok(o);
133        }
134
135        panic!(
136            "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn get_object)",
137            id
138        );
139    }
140
141    async fn find_object_lt_or_eq_version(
142        &self,
143        id: &ObjectID,
144        version: &SequenceNumber,
145    ) -> Result<Option<Object>, Self::Error> {
146        // First look up the exact version in object_cache.
147        let object = self
148            .object_cache
149            .get(id, Some(version))
150            .as_ref()
151            .map(|o| <&Object>::clone(o).clone());
152        if let Some(o) = object {
153            self.metrics.indexing_get_object_in_mem_hit.inc();
154            return Ok(Some(o));
155        }
156
157        // Second look up the latest version in object_cache. This may be
158        // called when the object is deleted hence the version at deletion
159        // is given.
160        let object = self
161            .object_cache
162            .get(id, None)
163            .as_ref()
164            .map(|o| <&Object>::clone(o).clone());
165        if let Some(o) = object {
166            if o.version() > *version {
167                panic!(
168                    "Found a higher version {} for object {}, expected lt_or_eq {}",
169                    o.version(),
170                    id,
171                    *version
172                );
173            }
174            if o.version() <= *version {
175                self.metrics.indexing_get_object_in_mem_hit.inc();
176                return Ok(Some(o));
177            }
178        }
179
180        panic!(
181            "Object {} is not found in TxChangesProcessor as an ObjectProvider (fn find_object_lt_or_eq_version)",
182            id
183        );
184    }
185}
186
187// This is a struct that is used to extract SuiSystemState and its dynamic children
188// for end-of-epoch indexing.
189pub(crate) struct EpochEndIndexingObjectStore<'a> {
190    objects: Vec<&'a Object>,
191}
192
193impl<'a> EpochEndIndexingObjectStore<'a> {
194    pub fn new(data: &'a CheckpointData) -> Self {
195        Self {
196            objects: data.latest_live_output_objects(),
197        }
198    }
199}
200
201impl sui_types::storage::ObjectStore for EpochEndIndexingObjectStore<'_> {
202    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
203        self.objects
204            .iter()
205            .find(|o| o.id() == *object_id)
206            .cloned()
207            .cloned()
208    }
209
210    fn get_object_by_key(
211        &self,
212        object_id: &ObjectID,
213        version: sui_types::base_types::VersionNumber,
214    ) -> Option<Object> {
215        self.objects
216            .iter()
217            .find(|o| o.id() == *object_id && o.version() == version)
218            .cloned()
219            .cloned()
220    }
221}