1use bytes::Buf;
15use bytes::BufMut;
16use move_core_types::language_storage::TypeTag;
17use prost::Message;
18use sui_consistent_store::Decode;
19use sui_consistent_store::Encode;
20use sui_consistent_store::Iter;
21use sui_consistent_store::Protobuf;
22use sui_consistent_store::error::DecodeError;
23use sui_consistent_store::error::EncodeError;
24use sui_consistent_store::error::Error;
25use sui_consistent_store::reader::Reader;
26use sui_types::base_types::ObjectID;
27use sui_types::base_types::SuiAddress;
28
29use crate::proto::BalanceDelta;
30
31pub const NAME: &str = "balance";
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct Key {
35 pub owner: SuiAddress,
36 pub coin_type: TypeTag,
37}
38
39pub type Value = Protobuf<BalanceDelta>;
40
41impl Encode for Key {
42 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
43 buf.put_slice(self.owner.as_ref());
44 let type_bytes = bcs::to_bytes(&self.coin_type)
45 .map_err(|e| EncodeError::with_source("bcs encode TypeTag", e))?;
46 buf.put_slice(&type_bytes);
47 Ok(())
48 }
49}
50
51impl Decode for Key {
52 fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
53 if buf.remaining() < ObjectID::LENGTH {
54 return Err(DecodeError::msg(format!(
55 "expected at least {} bytes for {NAME} Key owner, got {}",
56 ObjectID::LENGTH,
57 buf.remaining(),
58 )));
59 }
60 let mut owner_bytes = [0u8; ObjectID::LENGTH];
61 buf.copy_to_slice(&mut owner_bytes);
62 let owner = SuiAddress::from_bytes(owner_bytes)
63 .map_err(|e| DecodeError::with_source("decode SuiAddress", e))?;
64 let remaining = buf.copy_to_bytes(buf.remaining());
65 let coin_type: TypeTag = bcs::from_bytes(&remaining)
66 .map_err(|e| DecodeError::with_source("bcs decode TypeTag", e))?;
67 Ok(Key { owner, coin_type })
68 }
69}
70
71pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
74 let mut opts = resolver.options(NAME);
75 opts.set_merge_operator_associative("balance_merge", merge);
76 opts.set_compaction_filter("balance_compact_zero", compact);
77 opts
78}
79
80pub fn coin_delta(owner: SuiAddress, coin_type: TypeTag, delta: i128) -> (Key, Value) {
84 (
85 Key { owner, coin_type },
86 Protobuf(BalanceDelta {
87 coin: delta.to_le_bytes().to_vec().into(),
88 address: Default::default(),
89 }),
90 )
91}
92
93pub fn address_delta(owner: SuiAddress, coin_type: TypeTag, delta: i128) -> (Key, Value) {
97 (
98 Key { owner, coin_type },
99 Protobuf(BalanceDelta {
100 coin: Default::default(),
101 address: delta.to_le_bytes().to_vec().into(),
102 }),
103 )
104}
105
106pub fn delta(owner: SuiAddress, coin_type: TypeTag, coin: i128, address: i128) -> (Key, Value) {
112 (
113 Key { owner, coin_type },
114 Protobuf(BalanceDelta {
115 coin: coin.to_le_bytes().to_vec().into(),
116 address: address.to_le_bytes().to_vec().into(),
117 }),
118 )
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
124pub struct Balance {
125 pub coin: i128,
127 pub address: i128,
129}
130
131impl Balance {
132 pub fn total(&self) -> i128 {
135 self.coin.saturating_add(self.address)
136 }
137
138 pub(crate) fn from_delta(stored: &BalanceDelta) -> Result<Self, Error> {
144 Ok(Self {
145 coin: read_i128_field(&stored.coin, "coin")?,
146 address: read_i128_field(&stored.address, "address")?,
147 })
148 }
149}
150
151pub struct OwnerPrefix(pub SuiAddress);
155
156impl Encode for OwnerPrefix {
157 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
158 buf.put_slice(self.0.as_ref());
159 Ok(())
160 }
161}
162
163impl<R: Reader> super::RpcStoreSchema<R> {
164 pub fn get_balance(
166 &self,
167 owner: SuiAddress,
168 coin_type: TypeTag,
169 ) -> Result<Option<Balance>, Error> {
170 let Some(stored) = self.balance.get(&Key { owner, coin_type })? else {
171 return Ok(None);
172 };
173 Ok(Some(Balance::from_delta(&stored.into_inner())?))
174 }
175
176 pub fn iter_balances_owned_by(&self, owner: SuiAddress) -> Result<Iter<'_, Key, Value>, Error> {
178 self.balance.iter_prefix(&OwnerPrefix(owner))
179 }
180}
181
182fn read_i128_field(bytes: &prost::bytes::Bytes, field: &str) -> Result<i128, Error> {
185 if bytes.is_empty() {
186 return Ok(0);
187 }
188 let array: [u8; 16] = bytes.as_ref().try_into().map_err(|_| {
189 DecodeError::msg(format!(
190 "expected 16 bytes for BalanceDelta.{field}, got {}",
191 bytes.len(),
192 ))
193 })?;
194 Ok(i128::from_le_bytes(array))
195}
196
197fn read_i128_field_or_panic(bytes: &[u8], field: &str) -> i128 {
202 if bytes.is_empty() {
203 return 0;
204 }
205 let array: [u8; 16] = bytes.try_into().unwrap_or_else(|_| {
206 panic!(
207 "expected 16 bytes for BalanceDelta.{field}, got {}",
208 bytes.len(),
209 )
210 });
211 i128::from_le_bytes(array)
212}
213
214fn merge(
222 _key: &[u8],
223 existing_val: Option<&[u8]>,
224 operands: &rocksdb::MergeOperands,
225) -> Option<Vec<u8>> {
226 let mut coin: i128 = 0;
227 let mut address: i128 = 0;
228
229 if let Some(existing) = existing_val {
230 let stored = BalanceDelta::decode(existing).expect("decode existing BalanceDelta");
231 coin = read_i128_field_or_panic(&stored.coin, "coin");
232 address = read_i128_field_or_panic(&stored.address, "address");
233 }
234
235 for operand in operands {
236 let next = BalanceDelta::decode(operand).expect("decode BalanceDelta operand");
237 if !next.coin.is_empty() {
238 coin = coin.saturating_add(read_i128_field_or_panic(&next.coin, "coin"));
239 }
240 if !next.address.is_empty() {
241 address = address.saturating_add(read_i128_field_or_panic(&next.address, "address"));
242 }
243 }
244
245 let merged = BalanceDelta {
246 coin: coin.to_le_bytes().to_vec().into(),
247 address: address.to_le_bytes().to_vec().into(),
248 };
249 Some(merged.encode_to_vec())
250}
251
252fn compact(_level: u32, _key: &[u8], value: &[u8]) -> rocksdb::CompactionDecision {
260 let Ok(stored) = BalanceDelta::decode(value) else {
261 return rocksdb::CompactionDecision::Keep;
262 };
263 let coin = if stored.coin.is_empty() {
264 0
265 } else {
266 read_i128_field_or_panic(&stored.coin, "coin")
267 };
268 let address = if stored.address.is_empty() {
269 0
270 } else {
271 read_i128_field_or_panic(&stored.address, "address")
272 };
273 if coin == 0 && address == 0 {
274 rocksdb::CompactionDecision::Remove
275 } else {
276 rocksdb::CompactionDecision::Keep
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::collections::BTreeSet;
283
284 use move_core_types::language_storage::StructTag;
285 use sui_consistent_store::Db;
286 use sui_consistent_store::DbOptions;
287
288 use super::*;
289 use crate::RpcStoreSchema;
290
291 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
292 let dir = tempfile::tempdir().unwrap();
293 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
294 (dir, db, schema)
295 }
296
297 fn coin_type(name: &str) -> TypeTag {
298 TypeTag::Struct(Box::new(StructTag {
299 address: move_core_types::account_address::AccountAddress::new([2u8; 32]),
300 module: move_core_types::identifier::Identifier::new("coin").unwrap(),
301 name: move_core_types::identifier::Identifier::new(name).unwrap(),
302 type_params: vec![],
303 }))
304 }
305
306 fn owner(b: u8) -> SuiAddress {
307 SuiAddress::from_bytes([b; 32]).unwrap()
308 }
309
310 #[test]
311 fn get_balance_returns_none_for_unknown_owner() {
312 let (_dir, _db, schema) = fresh_db();
313 assert!(
314 schema
315 .get_balance(owner(1), coin_type("SUI"))
316 .unwrap()
317 .is_none()
318 );
319 }
320
321 #[test]
322 fn coin_and_address_deltas_accumulate_independently() {
323 let (_dir, db, schema) = fresh_db();
324 let (k1, v1) = coin_delta(owner(1), coin_type("SUI"), 100);
325 let (k2, v2) = coin_delta(owner(1), coin_type("SUI"), 50);
326 let (k3, v3) = address_delta(owner(1), coin_type("SUI"), 1_000);
327 let (k4, v4) = address_delta(owner(1), coin_type("SUI"), -250);
328
329 let mut batch = db.batch();
330 batch.merge(&schema.balance, &k1, &v1).unwrap();
331 batch.merge(&schema.balance, &k2, &v2).unwrap();
332 batch.merge(&schema.balance, &k3, &v3).unwrap();
333 batch.merge(&schema.balance, &k4, &v4).unwrap();
334 batch.commit().unwrap();
335
336 let balance = schema
337 .get_balance(owner(1), coin_type("SUI"))
338 .unwrap()
339 .expect("balance present");
340 assert_eq!(balance.coin, 150);
341 assert_eq!(balance.address, 750);
342 assert_eq!(balance.total(), 900);
343 }
344
345 #[test]
346 fn distinct_coin_types_do_not_alias() {
347 let (_dir, db, schema) = fresh_db();
348 let mut batch = db.batch();
349 let (k_sui, v_sui) = coin_delta(owner(1), coin_type("SUI"), 100);
350 let (k_usdc, v_usdc) = coin_delta(owner(1), coin_type("USDC"), 999);
351 batch.merge(&schema.balance, &k_sui, &v_sui).unwrap();
352 batch.merge(&schema.balance, &k_usdc, &v_usdc).unwrap();
353 batch.commit().unwrap();
354
355 assert_eq!(
356 schema
357 .get_balance(owner(1), coin_type("SUI"))
358 .unwrap()
359 .unwrap()
360 .coin,
361 100,
362 );
363 assert_eq!(
364 schema
365 .get_balance(owner(1), coin_type("USDC"))
366 .unwrap()
367 .unwrap()
368 .coin,
369 999,
370 );
371 }
372
373 #[test]
374 fn negative_then_positive_zero_out() {
375 let (_dir, db, schema) = fresh_db();
376 let (k1, v1) = coin_delta(owner(1), coin_type("SUI"), 100);
377 let (k2, v2) = coin_delta(owner(1), coin_type("SUI"), -100);
378 let mut batch = db.batch();
379 batch.merge(&schema.balance, &k1, &v1).unwrap();
380 batch.merge(&schema.balance, &k2, &v2).unwrap();
381 batch.commit().unwrap();
382
383 let balance = schema
384 .get_balance(owner(1), coin_type("SUI"))
385 .unwrap()
386 .expect("row still present pre-compaction");
387 assert_eq!(balance.coin, 0);
388 assert_eq!(balance.address, 0);
389 assert_eq!(balance.total(), 0);
390 }
391
392 #[test]
393 fn saturating_add_protects_against_i128_overflow() {
394 let (_dir, db, schema) = fresh_db();
395 let (k1, v1) = coin_delta(owner(1), coin_type("SUI"), i128::MAX);
396 let (k2, v2) = coin_delta(owner(1), coin_type("SUI"), 1);
397 let mut batch = db.batch();
398 batch.merge(&schema.balance, &k1, &v1).unwrap();
399 batch.merge(&schema.balance, &k2, &v2).unwrap();
400 batch.commit().unwrap();
401
402 let balance = schema
403 .get_balance(owner(1), coin_type("SUI"))
404 .unwrap()
405 .expect("balance present");
406 assert_eq!(balance.coin, i128::MAX);
407 }
408
409 #[test]
410 fn iter_balances_walks_only_target_owner() {
411 let (_dir, db, schema) = fresh_db();
412 let target_types: BTreeSet<TypeTag> =
413 [coin_type("SUI"), coin_type("USDC")].into_iter().collect();
414
415 let mut batch = db.batch();
416 for t in &target_types {
417 let (k, v) = coin_delta(owner(1), t.clone(), 100);
418 batch.merge(&schema.balance, &k, &v).unwrap();
419 }
420 let (k_other, v_other) = coin_delta(owner(2), coin_type("SUI"), 500);
422 batch.merge(&schema.balance, &k_other, &v_other).unwrap();
423 batch.commit().unwrap();
424
425 let found: BTreeSet<TypeTag> = schema
426 .iter_balances_owned_by(owner(1))
427 .unwrap()
428 .map(|res| res.unwrap().0.coin_type)
429 .collect();
430 assert_eq!(found, target_types);
431 }
432}