sui_graphql_rpc/types/
balance.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::available_range::AvailableRange;
5use super::cursor::{self, Page, RawPaginated, ScanLimited, Target};
6use super::uint53::UInt53;
7use super::{big_int::BigInt, move_type::MoveType, sui_address::SuiAddress};
8use crate::consistency::Checkpointed;
9use crate::data::{Db, DbConnection, QueryExecutor};
10use crate::error::Error;
11use crate::raw_query::RawQuery;
12use crate::{filter, query};
13use async_graphql::connection::{Connection, CursorType, Edge};
14use async_graphql::*;
15use diesel::{
16    sql_types::{BigInt as SqlBigInt, Nullable, Text},
17    OptionalExtension, QueryableByName,
18};
19use diesel_async::scoped_futures::ScopedFutureExt;
20use serde::{Deserialize, Serialize};
21use std::str::FromStr;
22use sui_indexer::types::OwnerType;
23use sui_types::TypeTag;
24
25/// The total balance for a particular coin type.
26#[derive(Clone, Debug, SimpleObject)]
27pub(crate) struct Balance {
28    /// Coin type for the balance, such as 0x2::sui::SUI
29    pub(crate) coin_type: MoveType,
30    /// How many coins of this type constitute the balance
31    pub(crate) coin_object_count: Option<UInt53>,
32    /// Total balance across all coin objects of the coin type
33    pub(crate) total_balance: Option<BigInt>,
34}
35
36/// Representation of a row of balance information from the DB. We read the balance as a `String` to
37/// deal with the large (bigger than 2^63 - 1) balances.
38#[derive(QueryableByName)]
39pub struct StoredBalance {
40    #[diesel(sql_type = Nullable<Text>)]
41    pub balance: Option<String>,
42    #[diesel(sql_type = Nullable<SqlBigInt>)]
43    pub count: Option<i64>,
44    #[diesel(sql_type = Text)]
45    pub coin_type: String,
46}
47
48pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
49
50/// The inner struct for the `Balance`'s cursor. The `coin_type` is used as the cursor, while the
51/// `checkpoint_viewed_at` sets the consistent upper bound for the cursor.
52#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
53pub(crate) struct BalanceCursor {
54    #[serde(rename = "t")]
55    coin_type: String,
56    /// The checkpoint sequence number this was viewed at.
57    #[serde(rename = "c")]
58    checkpoint_viewed_at: u64,
59}
60
61impl Balance {
62    /// Query for the balance of coins owned by `address`, of coins with type `coin_type`. Note that
63    /// `coin_type` is the type of `0x2::coin::Coin`'s type parameter, not the full type of the coin
64    /// object.
65    pub(crate) async fn query(
66        db: &Db,
67        address: SuiAddress,
68        coin_type: TypeTag,
69        checkpoint_viewed_at: u64,
70    ) -> Result<Option<Balance>, Error> {
71        let stored: Option<StoredBalance> = db
72            .execute_repeatable(move |conn| {
73                async move {
74                    let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await?
75                    else {
76                        return Ok::<_, diesel::result::Error>(None);
77                    };
78
79                    conn.result(move || {
80                        balance_query(address, Some(coin_type.clone()), range).into_boxed()
81                    })
82                    .await
83                    .optional()
84                }
85                .scope_boxed()
86            })
87            .await?;
88
89        stored.map(Balance::try_from).transpose()
90    }
91
92    /// Query the database for a `page` of coin balances. Each balance represents the total balance
93    /// for a particular coin type, owned by `address`.
94    pub(crate) async fn paginate(
95        db: &Db,
96        page: Page<Cursor>,
97        address: SuiAddress,
98        checkpoint_viewed_at: u64,
99    ) -> Result<Connection<String, Balance>, Error> {
100        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if they are
101        // consistent. Otherwise, use the value from the parameter, or set to None. This is so that
102        // paginated queries are consistent with the previous query that created the cursor.
103        let cursor_viewed_at = page.validate_cursor_consistency()?;
104        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
105
106        let Some((prev, next, results)) = db
107            .execute_repeatable(move |conn| {
108                async move {
109                    let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await?
110                    else {
111                        return Ok::<_, diesel::result::Error>(None);
112                    };
113
114                    let result = page
115                        .paginate_raw_query::<StoredBalance>(
116                            conn,
117                            checkpoint_viewed_at,
118                            balance_query(address, None, range),
119                        )
120                        .await?;
121
122                    Ok(Some(result))
123                }
124                .scope_boxed()
125            })
126            .await?
127        else {
128            return Err(Error::Client(
129                "Requested data is outside the available range".to_string(),
130            ));
131        };
132
133        let mut conn = Connection::new(prev, next);
134        for stored in results {
135            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
136            let balance = Balance::try_from(stored)?;
137            conn.edges.push(Edge::new(cursor, balance));
138        }
139
140        Ok(conn)
141    }
142}
143
144impl RawPaginated<Cursor> for StoredBalance {
145    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
146        filter!(query, "coin_type >= {}", cursor.coin_type.clone())
147    }
148
149    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
150        filter!(query, "coin_type <= {}", cursor.coin_type.clone())
151    }
152
153    fn order(asc: bool, query: RawQuery) -> RawQuery {
154        if asc {
155            return query.order_by("coin_type ASC");
156        }
157        query.order_by("coin_type DESC")
158    }
159}
160
161impl Target<Cursor> for StoredBalance {
162    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
163        Cursor::new(BalanceCursor {
164            coin_type: self.coin_type.clone(),
165            checkpoint_viewed_at,
166        })
167    }
168}
169
170impl Checkpointed for Cursor {
171    fn checkpoint_viewed_at(&self) -> u64 {
172        self.checkpoint_viewed_at
173    }
174}
175
176impl ScanLimited for Cursor {}
177
178impl TryFrom<StoredBalance> for Balance {
179    type Error = Error;
180
181    fn try_from(s: StoredBalance) -> Result<Self, Error> {
182        let StoredBalance {
183            balance,
184            count,
185            coin_type,
186        } = s;
187        let total_balance = balance
188            .map(|b| BigInt::from_str(&b))
189            .transpose()
190            .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
191
192        let coin_object_count = count.map(|c| UInt53::from(c as u64));
193
194        let coin_type = TypeTag::from_str(&coin_type)
195            .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?
196            .into();
197
198        Ok(Balance {
199            coin_type,
200            coin_object_count,
201            total_balance,
202        })
203    }
204}
205
206/// Query the database for a `page` of coin balances. Each balance represents the total balance for
207/// a particular coin type, owned by `address`. This function is meant to be called within a thunk
208/// and returns a RawQuery that can be converted into a BoxedSqlQuery with `.into_boxed()`.
209fn balance_query(
210    address: SuiAddress,
211    coin_type: Option<TypeTag>,
212    range: AvailableRange,
213) -> RawQuery {
214    // Construct the filtered inner query - apply the same filtering criteria to both
215    // objects_snapshot and objects_history tables.
216    let mut snapshot_objs = query!("SELECT * FROM objects_snapshot");
217    snapshot_objs = filter(snapshot_objs, address, coin_type.clone());
218
219    // Additionally filter objects_history table for results between the available range, or
220    // checkpoint_viewed_at, if provided.
221    let mut history_objs = query!("SELECT * FROM objects_history");
222    history_objs = filter(history_objs, address, coin_type.clone());
223    history_objs = filter!(
224        history_objs,
225        format!(
226            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
227            range.first, range.last
228        )
229    );
230
231    // Combine the two queries, and select the most recent version of each object.
232    let candidates = query!(
233        r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) o"#,
234        snapshot_objs,
235        history_objs
236    )
237    .order_by("object_id")
238    .order_by("object_version DESC");
239
240    // Objects that fulfill the filtering criteria may not be the most recent version available.
241    // Left join the candidates table on newer to filter out any objects that have a newer
242    // version.
243    let mut newer = query!("SELECT object_id, object_version FROM objects_history");
244    newer = filter!(
245        newer,
246        format!(
247            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
248            range.first, range.last
249        )
250    );
251    let final_ = query!(
252        r#"SELECT
253            CAST(SUM(coin_balance) AS TEXT) as balance,
254            COUNT(*) as count,
255            coin_type
256        FROM ({}) candidates
257        LEFT JOIN ({}) newer
258        ON (
259            candidates.object_id = newer.object_id
260            AND candidates.object_version < newer.object_version
261        )"#,
262        candidates,
263        newer
264    );
265
266    // Additionally for balance's query, group coins by coin_type.
267    filter!(final_, "newer.object_version IS NULL").group_by("coin_type")
268}
269
270/// Applies the filtering criteria for balances to the input `RawQuery` and returns a new
271/// `RawQuery`.
272fn filter(mut query: RawQuery, owner: SuiAddress, coin_type: Option<TypeTag>) -> RawQuery {
273    query = filter!(query, "coin_type IS NOT NULL AND object_status = 0");
274
275    query = filter!(
276        query,
277        format!(
278            "owner_id = '\\x{}'::bytea AND owner_type = {}",
279            hex::encode(owner.into_vec()),
280            OwnerType::Address as i16
281        )
282    );
283
284    if let Some(coin_type) = coin_type {
285        query = filter!(
286            query,
287            "coin_type = {}",
288            coin_type.to_canonical_display(/* with_prefix */ true)
289        );
290    };
291
292    query
293}