sui_rpc_store/indexer/
balance.rs1use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use move_core_types::language_storage::TypeTag;
25use sui_consistent_store::Batch;
26use sui_consistent_store::Restore;
27use sui_indexer_alt_framework::pipeline::Processor;
28use sui_indexer_alt_framework::pipeline::sequential;
29use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
30use sui_types::accumulator_root::AccumulatorKey;
31use sui_types::accumulator_root::AccumulatorValue;
32use sui_types::balance_change::derive_detailed_balance_changes_2;
33use sui_types::base_types::SuiAddress;
34use sui_types::coin::Coin;
35use sui_types::full_checkpoint_content::Checkpoint;
36use sui_types::object::Object;
37use sui_types::object::Owner;
38
39use crate::RpcStoreSchema;
40use crate::indexer::Schema;
41use crate::indexer::Store;
42use crate::schema::balance;
43use crate::schema::balance::Key;
44
45pub struct Balance;
47
48#[derive(Debug)]
49pub struct Delta {
50 pub owner: SuiAddress,
51 pub coin_type: TypeTag,
52 pub coin: i128,
55 pub address: i128,
58}
59
60#[async_trait]
61impl Processor for Balance {
62 const NAME: &'static str = "balance";
63 type Value = Delta;
64
65 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Delta>> {
66 let mut deltas = Vec::new();
67 for tx in &checkpoint.transactions {
68 for change in derive_detailed_balance_changes_2(&tx.effects, &checkpoint.object_set) {
69 deltas.push(Delta {
70 owner: change.address,
71 coin_type: change.coin_type,
72 coin: change.coin_amount,
73 address: change.address_amount,
74 });
75 }
76 }
77 Ok(deltas)
78 }
79}
80
81impl Restore for Balance {
82 type Schema = RpcStoreSchema;
83
84 fn restore(
104 &self,
105 schema: &Self::Schema,
106 object: &Object,
107 batch: &mut Batch,
108 ) -> anyhow::Result<()> {
109 match object.owner() {
110 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
111 if let Some((coin_type, value)) = coin_balance_for_restore(object)? {
112 let (key, val) = balance::delta(*owner, coin_type, value as i128, 0);
113 batch.merge(&schema.balance, &key, &val)?;
114 }
115 }
116 Owner::ObjectOwner(parent) if *parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into() => {
117 if let Some((owner, coin_type, balance_value)) = address_balance_info(object) {
118 let (key, val) = balance::delta(owner, coin_type, 0, balance_value);
119 batch.merge(&schema.balance, &key, &val)?;
120 }
121 }
122 _ => {}
123 }
124 Ok(())
125 }
126}
127
128fn coin_balance_for_restore(object: &Object) -> anyhow::Result<Option<(TypeTag, u64)>> {
133 Ok(Coin::extract_balance_if_coin(object)
134 .map_err(|e| anyhow::anyhow!("Failed to deserialize coin object {}: {e}", object.id()))?
135 .and_then(|(type_, value)| match type_ {
136 TypeTag::Struct(struct_tag) => Some((TypeTag::Struct(struct_tag), value)),
137 _ => None,
138 }))
139}
140
141fn address_balance_info(object: &Object) -> Option<(SuiAddress, TypeTag, i128)> {
147 let move_object = object.data.try_as_move()?;
148 let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
149 else {
150 return None;
151 };
152 let (key, value): (AccumulatorKey, AccumulatorValue) = move_object.try_into().ok()?;
153 let balance_value = value.as_u128()? as i128;
154 if balance_value <= 0 {
155 return None;
156 }
157 Some((key.owner, TypeTag::Struct(coin_type), balance_value))
158}
159
160#[async_trait]
161impl sequential::Handler for Balance {
162 type Store = Store;
163 type Batch = HashMap<Key, (i128, i128)>;
167
168 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Delta>) {
169 for d in values {
170 let entry = batch
171 .entry(Key {
172 owner: d.owner,
173 coin_type: d.coin_type,
174 })
175 .or_insert((0, 0));
176 entry.0 = entry.0.saturating_add(d.coin);
177 entry.1 = entry.1.saturating_add(d.address);
178 }
179 }
180
181 async fn commit<'a>(
182 &self,
183 batch: &Self::Batch,
184 conn: &mut sui_consistent_store::Connection<'a, Schema>,
185 ) -> anyhow::Result<usize> {
186 let cf = &conn.store.schema().balance;
187 for (key, (coin, address)) in batch {
188 let (_, value) = balance::delta(key.owner, key.coin_type.clone(), *coin, *address);
189 conn.batch.merge(cf, key, &value)?;
190 }
191 Ok(batch.len())
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::sync::Arc;
198
199 use sui_consistent_store::Db;
200 use sui_consistent_store::DbOptions;
201 use sui_types::base_types::ObjectID;
202 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
203
204 use super::*;
205
206 #[tokio::test]
207 async fn process_runs_against_synthetic_checkpoint() {
208 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
209 let _ = Balance.process(&checkpoint).await.unwrap();
210 }
211
212 #[test]
213 fn restore_credits_coin_half_for_address_owned_gas_coin() {
214 let dir = tempfile::tempdir().unwrap();
215 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
216
217 let owner = SuiAddress::ZERO;
218 let coin = Object::with_id_owner_gas_for_testing(ObjectID::from_single_byte(5), owner, 42);
219 let coin_type = coin.coin_type_maybe().unwrap();
220
221 let mut batch = db.batch();
222 Balance.restore(&schema, &coin, &mut batch).unwrap();
223 batch.commit().unwrap();
224
225 let balance = schema
226 .get_balance(owner, coin_type)
227 .unwrap()
228 .expect("balance row present");
229 assert_eq!(balance.coin, 42);
230 assert_eq!(balance.address, 0);
234 }
235
236 #[test]
237 fn restore_skips_non_coin_objects() {
238 let dir = tempfile::tempdir().unwrap();
239 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
240
241 let owner = SuiAddress::ZERO;
242 let non_coin = Object::with_id_owner_for_testing(ObjectID::from_single_byte(9), owner);
243
244 let mut batch = db.batch();
245 Balance.restore(&schema, &non_coin, &mut batch).unwrap();
246 batch.commit().unwrap();
247 }
252}