sui_rpc_store/indexer/
balance.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that feeds the
5//! [`schema::balance`](crate::schema::balance) CF.
6//!
7//! Mirrors `index_transactions` in `sui-core::rpc_index`: for
8//! every transaction in the checkpoint, call
9//! [`sui_types::balance_change::derive_detailed_balance_changes_2`]
10//! and forward the returned `(coin_amount, address_amount)`
11//! deltas straight into the CF as a single combined merge operand
12//! per `(owner, coin_type)`.
13//!
14//! The `derive_detailed_balance_changes_2` helper already
15//! consolidates input and output coin objects (for the *coin*
16//! side) and parses the effects' accumulator writes (for the
17//! *address* side), so the pipeline doesn't need to walk objects
18//! itself.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use move_core_types::language_storage::TypeTag;
25use sui_consistent_store::Batch;
26use sui_consistent_store::Restore;
27use sui_indexer_alt_framework::pipeline::Processor;
28use sui_indexer_alt_framework::pipeline::sequential;
29use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
30use sui_types::accumulator_root::AccumulatorKey;
31use sui_types::accumulator_root::AccumulatorValue;
32use sui_types::balance_change::derive_detailed_balance_changes_2;
33use sui_types::base_types::SuiAddress;
34use sui_types::coin::Coin;
35use sui_types::full_checkpoint_content::Checkpoint;
36use sui_types::object::Object;
37use sui_types::object::Owner;
38
39use crate::RpcStoreSchema;
40use crate::indexer::Schema;
41use crate::indexer::Store;
42use crate::schema::balance;
43use crate::schema::balance::Key;
44
45/// Pipeline marker for `balance`.
46pub struct Balance;
47
48#[derive(Debug)]
49pub struct Delta {
50    pub owner: SuiAddress,
51    pub coin_type: TypeTag,
52    /// Change to the coin-derived component (sum of owned
53    /// `Coin<T>` deltas).
54    pub coin: i128,
55    /// Change to the accumulator-derived component (sum of
56    /// per-tx accumulator writes against `(owner, coin_type)`).
57    pub address: i128,
58}
59
60#[async_trait]
61impl Processor for Balance {
62    const NAME: &'static str = "balance";
63    type Value = Delta;
64
65    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Delta>> {
66        let mut deltas = Vec::new();
67        for tx in &checkpoint.transactions {
68            for change in derive_detailed_balance_changes_2(&tx.effects, &checkpoint.object_set) {
69                deltas.push(Delta {
70                    owner: change.address,
71                    coin_type: change.coin_type,
72                    coin: change.coin_amount,
73                    address: change.address_amount,
74                });
75            }
76        }
77        Ok(deltas)
78    }
79}
80
81impl Restore for Balance {
82    type Schema = RpcStoreSchema;
83
84    /// Stage merge operands derived from a single live object.
85    /// Two sources contribute to a balance row, both recoverable
86    /// from the live object set:
87    ///
88    /// - **Coin half**: address-owned (and consensus-address-owned)
89    ///   `Coin<T>` objects. The coin's `balance` field is credited
90    ///   to the `(owner, coin_type)` row's coin component. Mirrors
91    ///   `index_object`'s coin path in `sui-core::rpc_index`.
92    ///
93    /// - **Address half**: dynamic-field objects parented to
94    ///   [`SUI_ACCUMULATOR_ROOT_OBJECT_ID`]. These carry the
95    ///   per-`(owner, coin_type)` accumulator balance, which the
96    ///   tip pipeline would otherwise re-derive from
97    ///   `AccumulatorWrite` events. Mirrors `get_address_balance_info`
98    ///   in `sui-core::rpc_index`.
99    ///
100    /// Everything else (shared / immutable objects, non-coin
101    /// address-owned objects, dynamic fields under other parents)
102    /// contributes no balance row.
103    fn restore(
104        &self,
105        schema: &Self::Schema,
106        object: &Object,
107        batch: &mut Batch,
108    ) -> anyhow::Result<()> {
109        match object.owner() {
110            Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
111                if let Some((coin_type, value)) = coin_balance_for_restore(object)? {
112                    let (key, val) = balance::delta(*owner, coin_type, value as i128, 0);
113                    batch.merge(&schema.balance, &key, &val)?;
114                }
115            }
116            Owner::ObjectOwner(parent) if *parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into() => {
117                if let Some((owner, coin_type, balance_value)) = address_balance_info(object) {
118                    let (key, val) = balance::delta(owner, coin_type, 0, balance_value);
119                    batch.merge(&schema.balance, &key, &val)?;
120                }
121            }
122            _ => {}
123        }
124        Ok(())
125    }
126}
127
128/// Extract the `(coin_type, balance)` pair for a coin object, or
129/// `None` if `object` is not a coin or carries a non-struct type
130/// tag. Mirrors `get_balance_and_type_if_coin` in
131/// `sui-core::rpc_index`.
132fn coin_balance_for_restore(object: &Object) -> anyhow::Result<Option<(TypeTag, u64)>> {
133    Ok(Coin::extract_balance_if_coin(object)
134        .map_err(|e| anyhow::anyhow!("Failed to deserialize coin object {}: {e}", object.id()))?
135        .and_then(|(type_, value)| match type_ {
136            TypeTag::Struct(struct_tag) => Some((TypeTag::Struct(struct_tag), value)),
137            _ => None,
138        }))
139}
140
141/// Extract `(owner, coin_type, balance)` from a dynamic-field
142/// object parented to the accumulator root. Returns `None` for
143/// non-balance fields, fields whose value cannot be parsed as a
144/// `u128`, or non-positive balances. Mirrors
145/// `get_address_balance_info` in `sui-core::rpc_index`.
146fn address_balance_info(object: &Object) -> Option<(SuiAddress, TypeTag, i128)> {
147    let move_object = object.data.try_as_move()?;
148    let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
149    else {
150        return None;
151    };
152    let (key, value): (AccumulatorKey, AccumulatorValue) = move_object.try_into().ok()?;
153    let balance_value = value.as_u128()? as i128;
154    if balance_value <= 0 {
155        return None;
156    }
157    Some((key.owner, TypeTag::Struct(coin_type), balance_value))
158}
159
160#[async_trait]
161impl sequential::Handler for Balance {
162    type Store = Store;
163    /// Combine deltas observed in this checkpoint by
164    /// `(owner, coin_type)` so a single combined merge operand is
165    /// staged per key instead of many small ones.
166    type Batch = HashMap<Key, (i128, i128)>;
167
168    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Delta>) {
169        for d in values {
170            let entry = batch
171                .entry(Key {
172                    owner: d.owner,
173                    coin_type: d.coin_type,
174                })
175                .or_insert((0, 0));
176            entry.0 = entry.0.saturating_add(d.coin);
177            entry.1 = entry.1.saturating_add(d.address);
178        }
179    }
180
181    async fn commit<'a>(
182        &self,
183        batch: &Self::Batch,
184        conn: &mut sui_consistent_store::Connection<'a, Schema>,
185    ) -> anyhow::Result<usize> {
186        let cf = &conn.store.schema().balance;
187        for (key, (coin, address)) in batch {
188            let (_, value) = balance::delta(key.owner, key.coin_type.clone(), *coin, *address);
189            conn.batch.merge(cf, key, &value)?;
190        }
191        Ok(batch.len())
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::sync::Arc;
198
199    use sui_consistent_store::Db;
200    use sui_consistent_store::DbOptions;
201    use sui_types::base_types::ObjectID;
202    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
203
204    use super::*;
205
206    #[tokio::test]
207    async fn process_runs_against_synthetic_checkpoint() {
208        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
209        let _ = Balance.process(&checkpoint).await.unwrap();
210    }
211
212    #[test]
213    fn restore_credits_coin_half_for_address_owned_gas_coin() {
214        let dir = tempfile::tempdir().unwrap();
215        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
216
217        let owner = SuiAddress::ZERO;
218        let coin = Object::with_id_owner_gas_for_testing(ObjectID::from_single_byte(5), owner, 42);
219        let coin_type = coin.coin_type_maybe().unwrap();
220
221        let mut batch = db.batch();
222        Balance.restore(&schema, &coin, &mut batch).unwrap();
223        batch.commit().unwrap();
224
225        let balance = schema
226            .get_balance(owner, coin_type)
227            .unwrap()
228            .expect("balance row present");
229        assert_eq!(balance.coin, 42);
230        // No matching accumulator-root dynamic-field object was
231        // restored alongside the coin, so the address half stays
232        // zero. A test that exercises the address half lives below.
233        assert_eq!(balance.address, 0);
234    }
235
236    #[test]
237    fn restore_skips_non_coin_objects() {
238        let dir = tempfile::tempdir().unwrap();
239        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
240
241        let owner = SuiAddress::ZERO;
242        let non_coin = Object::with_id_owner_for_testing(ObjectID::from_single_byte(9), owner);
243
244        let mut batch = db.batch();
245        Balance.restore(&schema, &non_coin, &mut batch).unwrap();
246        batch.commit().unwrap();
247        // Nothing to assert on read because we don't know the
248        // (non-coin) type to query by; the meaningful assertion
249        // is just that `restore` returned `Ok` without staging a
250        // bad write.
251    }
252}