sui_rpc_store/schema/
balance.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `(owner, coin_type)` → `BalanceDelta`.
5//!
6//! Each row holds two independent `i128` accumulators: `coin` from
7//! the owned-`Coin<T>`-object pipeline and `address` from the
8//! accumulator-balance pipeline. Both pipelines stage merge
9//! operands carrying only their own field; the merge operator
10//! sums each field component-wise with saturation, and the
11//! compaction filter drops rows where both components are zero so
12//! a fully cancelled balance doesn't linger on disk.
13
14use bytes::Buf;
15use bytes::BufMut;
16use move_core_types::language_storage::TypeTag;
17use prost::Message;
18use sui_consistent_store::Decode;
19use sui_consistent_store::Encode;
20use sui_consistent_store::Iter;
21use sui_consistent_store::Protobuf;
22use sui_consistent_store::error::DecodeError;
23use sui_consistent_store::error::EncodeError;
24use sui_consistent_store::error::Error;
25use sui_consistent_store::reader::Reader;
26use sui_types::base_types::ObjectID;
27use sui_types::base_types::SuiAddress;
28
29use crate::proto::BalanceDelta;
30
31pub const NAME: &str = "balance";
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct Key {
35    pub owner: SuiAddress,
36    pub coin_type: TypeTag,
37}
38
39pub type Value = Protobuf<BalanceDelta>;
40
41impl Encode for Key {
42    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
43        buf.put_slice(self.owner.as_ref());
44        let type_bytes = bcs::to_bytes(&self.coin_type)
45            .map_err(|e| EncodeError::with_source("bcs encode TypeTag", e))?;
46        buf.put_slice(&type_bytes);
47        Ok(())
48    }
49}
50
51impl Decode for Key {
52    fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
53        if buf.remaining() < ObjectID::LENGTH {
54            return Err(DecodeError::msg(format!(
55                "expected at least {} bytes for {NAME} Key owner, got {}",
56                ObjectID::LENGTH,
57                buf.remaining(),
58            )));
59        }
60        let mut owner_bytes = [0u8; ObjectID::LENGTH];
61        buf.copy_to_slice(&mut owner_bytes);
62        let owner = SuiAddress::from_bytes(owner_bytes)
63            .map_err(|e| DecodeError::with_source("decode SuiAddress", e))?;
64        let remaining = buf.copy_to_bytes(buf.remaining());
65        let coin_type: TypeTag = bcs::from_bytes(&remaining)
66            .map_err(|e| DecodeError::with_source("bcs decode TypeTag", e))?;
67        Ok(Key { owner, coin_type })
68    }
69}
70
71/// CF options: install the field-wise i128 merge operator and the
72/// drop-when-zero compaction filter.
73pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
74    let mut opts = resolver.options(NAME);
75    opts.set_merge_operator_associative("balance_merge", merge);
76    opts.set_compaction_filter("balance_compact_zero", compact);
77    opts
78}
79
80/// Build a `(Key, Value)` pair representing a coin-side delta —
81/// the change in coin balance for `(owner, coin_type)` due to a
82/// `Coin<T>` create / transfer / destroy event.
83pub fn coin_delta(owner: SuiAddress, coin_type: TypeTag, delta: i128) -> (Key, Value) {
84    (
85        Key { owner, coin_type },
86        Protobuf(BalanceDelta {
87            coin: delta.to_le_bytes().to_vec().into(),
88            address: Default::default(),
89        }),
90    )
91}
92
93/// Build a `(Key, Value)` pair representing an accumulator-side
94/// delta — the change in address-balance for `(owner, coin_type)`
95/// observed via the accumulator-bucket pipeline.
96pub fn address_delta(owner: SuiAddress, coin_type: TypeTag, delta: i128) -> (Key, Value) {
97    (
98        Key { owner, coin_type },
99        Protobuf(BalanceDelta {
100            coin: Default::default(),
101            address: delta.to_le_bytes().to_vec().into(),
102        }),
103    )
104}
105
106/// Build a `(Key, Value)` pair representing both sides of the
107/// balance change for `(owner, coin_type)` in a single merge
108/// operand. Either field may be zero; the merge operator's
109/// field-wise sum makes a zero contribution a no-op against the
110/// accumulator.
111pub fn delta(owner: SuiAddress, coin_type: TypeTag, coin: i128, address: i128) -> (Key, Value) {
112    (
113        Key { owner, coin_type },
114        Protobuf(BalanceDelta {
115            coin: coin.to_le_bytes().to_vec().into(),
116            address: address.to_le_bytes().to_vec().into(),
117        }),
118    )
119}
120
121/// Caller-facing view of one balance row, decomposed into its
122/// two contributing sources.
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
124pub struct Balance {
125    /// Aggregated change from owned `Coin<T>` objects.
126    pub coin: i128,
127    /// Aggregated change from the accumulator-bucket pipeline.
128    pub address: i128,
129}
130
131impl Balance {
132    /// Total balance the caller should display: the saturating
133    /// sum of the two components.
134    pub fn total(&self) -> i128 {
135        self.coin.saturating_add(self.address)
136    }
137
138    /// Decode a stored [`BalanceDelta`] into the typed view. The
139    /// single decode path shared by every reader so the two `i128`
140    /// fields are interpreted — and malformed payloads rejected —
141    /// identically whether the row is fetched by point lookup or
142    /// surfaced through iteration.
143    pub(crate) fn from_delta(stored: &BalanceDelta) -> Result<Self, Error> {
144        Ok(Self {
145            coin: read_i128_field(&stored.coin, "coin")?,
146            address: read_i128_field(&stored.address, "address")?,
147        })
148    }
149}
150
151/// Prefix encoder for "all balances of `owner`". The prefix is the
152/// 32 raw owner bytes — the leading bytes of any `Key` whose
153/// `owner` matches.
154pub struct OwnerPrefix(pub SuiAddress);
155
156impl Encode for OwnerPrefix {
157    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
158        buf.put_slice(self.0.as_ref());
159        Ok(())
160    }
161}
162
163impl<R: Reader> super::RpcStoreSchema<R> {
164    /// Look up the aggregated balance for `(owner, coin_type)`.
165    pub fn get_balance(
166        &self,
167        owner: SuiAddress,
168        coin_type: TypeTag,
169    ) -> Result<Option<Balance>, Error> {
170        let Some(stored) = self.balance.get(&Key { owner, coin_type })? else {
171            return Ok(None);
172        };
173        Ok(Some(Balance::from_delta(&stored.into_inner())?))
174    }
175
176    /// Iterate every coin type that `owner` has a balance in.
177    pub fn iter_balances_owned_by(&self, owner: SuiAddress) -> Result<Iter<'_, Key, Value>, Error> {
178        self.balance.iter_prefix(&OwnerPrefix(owner))
179    }
180}
181
182/// Decode an `i128` from one of the proto's `bytes` fields, with
183/// an empty payload treated as zero.
184fn read_i128_field(bytes: &prost::bytes::Bytes, field: &str) -> Result<i128, Error> {
185    if bytes.is_empty() {
186        return Ok(0);
187    }
188    let array: [u8; 16] = bytes.as_ref().try_into().map_err(|_| {
189        DecodeError::msg(format!(
190            "expected 16 bytes for BalanceDelta.{field}, got {}",
191            bytes.len(),
192        ))
193    })?;
194    Ok(i128::from_le_bytes(array))
195}
196
197/// Read an `i128` from a raw 16-byte LE field, treating an empty
198/// payload as zero. Panics if the payload is not 0 or 16 bytes —
199/// only the merge operator and compaction filter call this, and
200/// neither has access to the structured `Error` machinery.
201fn read_i128_field_or_panic(bytes: &[u8], field: &str) -> i128 {
202    if bytes.is_empty() {
203        return 0;
204    }
205    let array: [u8; 16] = bytes.try_into().unwrap_or_else(|_| {
206        panic!(
207            "expected 16 bytes for BalanceDelta.{field}, got {}",
208            bytes.len(),
209        )
210    });
211    i128::from_le_bytes(array)
212}
213
214/// Associative merge: sum the two components independently with
215/// saturating-on-overflow semantics.
216///
217/// Encode / decode failures panic — this CF is written only by
218/// the crate's `coin_delta` / `address_delta` helpers, so a parse
219/// failure indicates corruption rather than a recoverable
220/// situation.
221fn merge(
222    _key: &[u8],
223    existing_val: Option<&[u8]>,
224    operands: &rocksdb::MergeOperands,
225) -> Option<Vec<u8>> {
226    let mut coin: i128 = 0;
227    let mut address: i128 = 0;
228
229    if let Some(existing) = existing_val {
230        let stored = BalanceDelta::decode(existing).expect("decode existing BalanceDelta");
231        coin = read_i128_field_or_panic(&stored.coin, "coin");
232        address = read_i128_field_or_panic(&stored.address, "address");
233    }
234
235    for operand in operands {
236        let next = BalanceDelta::decode(operand).expect("decode BalanceDelta operand");
237        if !next.coin.is_empty() {
238            coin = coin.saturating_add(read_i128_field_or_panic(&next.coin, "coin"));
239        }
240        if !next.address.is_empty() {
241            address = address.saturating_add(read_i128_field_or_panic(&next.address, "address"));
242        }
243    }
244
245    let merged = BalanceDelta {
246        coin: coin.to_le_bytes().to_vec().into(),
247        address: address.to_le_bytes().to_vec().into(),
248    };
249    Some(merged.encode_to_vec())
250}
251
252/// Compaction filter: drop rows whose two components are both
253/// zero (so an account that's been emptied doesn't keep a
254/// gravestone entry).
255///
256/// A row whose payload doesn't decode is kept rather than
257/// dropped: better to surface a corruption signal at read time
258/// than to silently discard it during compaction.
259fn compact(_level: u32, _key: &[u8], value: &[u8]) -> rocksdb::CompactionDecision {
260    let Ok(stored) = BalanceDelta::decode(value) else {
261        return rocksdb::CompactionDecision::Keep;
262    };
263    let coin = if stored.coin.is_empty() {
264        0
265    } else {
266        read_i128_field_or_panic(&stored.coin, "coin")
267    };
268    let address = if stored.address.is_empty() {
269        0
270    } else {
271        read_i128_field_or_panic(&stored.address, "address")
272    };
273    if coin == 0 && address == 0 {
274        rocksdb::CompactionDecision::Remove
275    } else {
276        rocksdb::CompactionDecision::Keep
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use std::collections::BTreeSet;
283
284    use move_core_types::language_storage::StructTag;
285    use sui_consistent_store::Db;
286    use sui_consistent_store::DbOptions;
287
288    use super::*;
289    use crate::RpcStoreSchema;
290
291    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
292        let dir = tempfile::tempdir().unwrap();
293        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
294        (dir, db, schema)
295    }
296
297    fn coin_type(name: &str) -> TypeTag {
298        TypeTag::Struct(Box::new(StructTag {
299            address: move_core_types::account_address::AccountAddress::new([2u8; 32]),
300            module: move_core_types::identifier::Identifier::new("coin").unwrap(),
301            name: move_core_types::identifier::Identifier::new(name).unwrap(),
302            type_params: vec![],
303        }))
304    }
305
306    fn owner(b: u8) -> SuiAddress {
307        SuiAddress::from_bytes([b; 32]).unwrap()
308    }
309
310    #[test]
311    fn get_balance_returns_none_for_unknown_owner() {
312        let (_dir, _db, schema) = fresh_db();
313        assert!(
314            schema
315                .get_balance(owner(1), coin_type("SUI"))
316                .unwrap()
317                .is_none()
318        );
319    }
320
321    #[test]
322    fn coin_and_address_deltas_accumulate_independently() {
323        let (_dir, db, schema) = fresh_db();
324        let (k1, v1) = coin_delta(owner(1), coin_type("SUI"), 100);
325        let (k2, v2) = coin_delta(owner(1), coin_type("SUI"), 50);
326        let (k3, v3) = address_delta(owner(1), coin_type("SUI"), 1_000);
327        let (k4, v4) = address_delta(owner(1), coin_type("SUI"), -250);
328
329        let mut batch = db.batch();
330        batch.merge(&schema.balance, &k1, &v1).unwrap();
331        batch.merge(&schema.balance, &k2, &v2).unwrap();
332        batch.merge(&schema.balance, &k3, &v3).unwrap();
333        batch.merge(&schema.balance, &k4, &v4).unwrap();
334        batch.commit().unwrap();
335
336        let balance = schema
337            .get_balance(owner(1), coin_type("SUI"))
338            .unwrap()
339            .expect("balance present");
340        assert_eq!(balance.coin, 150);
341        assert_eq!(balance.address, 750);
342        assert_eq!(balance.total(), 900);
343    }
344
345    #[test]
346    fn distinct_coin_types_do_not_alias() {
347        let (_dir, db, schema) = fresh_db();
348        let mut batch = db.batch();
349        let (k_sui, v_sui) = coin_delta(owner(1), coin_type("SUI"), 100);
350        let (k_usdc, v_usdc) = coin_delta(owner(1), coin_type("USDC"), 999);
351        batch.merge(&schema.balance, &k_sui, &v_sui).unwrap();
352        batch.merge(&schema.balance, &k_usdc, &v_usdc).unwrap();
353        batch.commit().unwrap();
354
355        assert_eq!(
356            schema
357                .get_balance(owner(1), coin_type("SUI"))
358                .unwrap()
359                .unwrap()
360                .coin,
361            100,
362        );
363        assert_eq!(
364            schema
365                .get_balance(owner(1), coin_type("USDC"))
366                .unwrap()
367                .unwrap()
368                .coin,
369            999,
370        );
371    }
372
373    #[test]
374    fn negative_then_positive_zero_out() {
375        let (_dir, db, schema) = fresh_db();
376        let (k1, v1) = coin_delta(owner(1), coin_type("SUI"), 100);
377        let (k2, v2) = coin_delta(owner(1), coin_type("SUI"), -100);
378        let mut batch = db.batch();
379        batch.merge(&schema.balance, &k1, &v1).unwrap();
380        batch.merge(&schema.balance, &k2, &v2).unwrap();
381        batch.commit().unwrap();
382
383        let balance = schema
384            .get_balance(owner(1), coin_type("SUI"))
385            .unwrap()
386            .expect("row still present pre-compaction");
387        assert_eq!(balance.coin, 0);
388        assert_eq!(balance.address, 0);
389        assert_eq!(balance.total(), 0);
390    }
391
392    #[test]
393    fn saturating_add_protects_against_i128_overflow() {
394        let (_dir, db, schema) = fresh_db();
395        let (k1, v1) = coin_delta(owner(1), coin_type("SUI"), i128::MAX);
396        let (k2, v2) = coin_delta(owner(1), coin_type("SUI"), 1);
397        let mut batch = db.batch();
398        batch.merge(&schema.balance, &k1, &v1).unwrap();
399        batch.merge(&schema.balance, &k2, &v2).unwrap();
400        batch.commit().unwrap();
401
402        let balance = schema
403            .get_balance(owner(1), coin_type("SUI"))
404            .unwrap()
405            .expect("balance present");
406        assert_eq!(balance.coin, i128::MAX);
407    }
408
409    #[test]
410    fn iter_balances_walks_only_target_owner() {
411        let (_dir, db, schema) = fresh_db();
412        let target_types: BTreeSet<TypeTag> =
413            [coin_type("SUI"), coin_type("USDC")].into_iter().collect();
414
415        let mut batch = db.batch();
416        for t in &target_types {
417            let (k, v) = coin_delta(owner(1), t.clone(), 100);
418            batch.merge(&schema.balance, &k, &v).unwrap();
419        }
420        // Unrelated owner — must not appear.
421        let (k_other, v_other) = coin_delta(owner(2), coin_type("SUI"), 500);
422        batch.merge(&schema.balance, &k_other, &v_other).unwrap();
423        batch.commit().unwrap();
424
425        let found: BTreeSet<TypeTag> = schema
426            .iter_balances_owned_by(owner(1))
427            .unwrap()
428            .map(|res| res.unwrap().0.coin_type)
429            .collect();
430        assert_eq!(found, target_types);
431    }
432}