1use super::available_range::AvailableRange;
5use super::cursor::{self, Page, RawPaginated, ScanLimited, Target};
6use super::uint53::UInt53;
7use super::{big_int::BigInt, move_type::MoveType, sui_address::SuiAddress};
8use crate::consistency::Checkpointed;
9use crate::data::{Db, DbConnection, QueryExecutor};
10use crate::error::Error;
11use crate::raw_query::RawQuery;
12use crate::{filter, query};
13use async_graphql::connection::{Connection, CursorType, Edge};
14use async_graphql::*;
15use diesel::{
16 sql_types::{BigInt as SqlBigInt, Nullable, Text},
17 OptionalExtension, QueryableByName,
18};
19use diesel_async::scoped_futures::ScopedFutureExt;
20use serde::{Deserialize, Serialize};
21use std::str::FromStr;
22use sui_indexer::types::OwnerType;
23use sui_types::TypeTag;
24
25#[derive(Clone, Debug, SimpleObject)]
27pub(crate) struct Balance {
28 pub(crate) coin_type: MoveType,
30 pub(crate) coin_object_count: Option<UInt53>,
32 pub(crate) total_balance: Option<BigInt>,
34}
35
36#[derive(QueryableByName)]
39pub struct StoredBalance {
40 #[diesel(sql_type = Nullable<Text>)]
41 pub balance: Option<String>,
42 #[diesel(sql_type = Nullable<SqlBigInt>)]
43 pub count: Option<i64>,
44 #[diesel(sql_type = Text)]
45 pub coin_type: String,
46}
47
48pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
49
50#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
53pub(crate) struct BalanceCursor {
54 #[serde(rename = "t")]
55 coin_type: String,
56 #[serde(rename = "c")]
58 checkpoint_viewed_at: u64,
59}
60
61impl Balance {
62 pub(crate) async fn query(
66 db: &Db,
67 address: SuiAddress,
68 coin_type: TypeTag,
69 checkpoint_viewed_at: u64,
70 ) -> Result<Option<Balance>, Error> {
71 let stored: Option<StoredBalance> = db
72 .execute_repeatable(move |conn| {
73 async move {
74 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await?
75 else {
76 return Ok::<_, diesel::result::Error>(None);
77 };
78
79 conn.result(move || {
80 balance_query(address, Some(coin_type.clone()), range).into_boxed()
81 })
82 .await
83 .optional()
84 }
85 .scope_boxed()
86 })
87 .await?;
88
89 stored.map(Balance::try_from).transpose()
90 }
91
92 pub(crate) async fn paginate(
95 db: &Db,
96 page: Page<Cursor>,
97 address: SuiAddress,
98 checkpoint_viewed_at: u64,
99 ) -> Result<Connection<String, Balance>, Error> {
100 let cursor_viewed_at = page.validate_cursor_consistency()?;
104 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
105
106 let Some((prev, next, results)) = db
107 .execute_repeatable(move |conn| {
108 async move {
109 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at).await?
110 else {
111 return Ok::<_, diesel::result::Error>(None);
112 };
113
114 let result = page
115 .paginate_raw_query::<StoredBalance>(
116 conn,
117 checkpoint_viewed_at,
118 balance_query(address, None, range),
119 )
120 .await?;
121
122 Ok(Some(result))
123 }
124 .scope_boxed()
125 })
126 .await?
127 else {
128 return Err(Error::Client(
129 "Requested data is outside the available range".to_string(),
130 ));
131 };
132
133 let mut conn = Connection::new(prev, next);
134 for stored in results {
135 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
136 let balance = Balance::try_from(stored)?;
137 conn.edges.push(Edge::new(cursor, balance));
138 }
139
140 Ok(conn)
141 }
142}
143
144impl RawPaginated<Cursor> for StoredBalance {
145 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
146 filter!(query, "coin_type >= {}", cursor.coin_type.clone())
147 }
148
149 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
150 filter!(query, "coin_type <= {}", cursor.coin_type.clone())
151 }
152
153 fn order(asc: bool, query: RawQuery) -> RawQuery {
154 if asc {
155 return query.order_by("coin_type ASC");
156 }
157 query.order_by("coin_type DESC")
158 }
159}
160
161impl Target<Cursor> for StoredBalance {
162 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
163 Cursor::new(BalanceCursor {
164 coin_type: self.coin_type.clone(),
165 checkpoint_viewed_at,
166 })
167 }
168}
169
170impl Checkpointed for Cursor {
171 fn checkpoint_viewed_at(&self) -> u64 {
172 self.checkpoint_viewed_at
173 }
174}
175
176impl ScanLimited for Cursor {}
177
178impl TryFrom<StoredBalance> for Balance {
179 type Error = Error;
180
181 fn try_from(s: StoredBalance) -> Result<Self, Error> {
182 let StoredBalance {
183 balance,
184 count,
185 coin_type,
186 } = s;
187 let total_balance = balance
188 .map(|b| BigInt::from_str(&b))
189 .transpose()
190 .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
191
192 let coin_object_count = count.map(|c| UInt53::from(c as u64));
193
194 let coin_type = TypeTag::from_str(&coin_type)
195 .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?
196 .into();
197
198 Ok(Balance {
199 coin_type,
200 coin_object_count,
201 total_balance,
202 })
203 }
204}
205
206fn balance_query(
210 address: SuiAddress,
211 coin_type: Option<TypeTag>,
212 range: AvailableRange,
213) -> RawQuery {
214 let mut snapshot_objs = query!("SELECT * FROM objects_snapshot");
217 snapshot_objs = filter(snapshot_objs, address, coin_type.clone());
218
219 let mut history_objs = query!("SELECT * FROM objects_history");
222 history_objs = filter(history_objs, address, coin_type.clone());
223 history_objs = filter!(
224 history_objs,
225 format!(
226 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
227 range.first, range.last
228 )
229 );
230
231 let candidates = query!(
233 r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) o"#,
234 snapshot_objs,
235 history_objs
236 )
237 .order_by("object_id")
238 .order_by("object_version DESC");
239
240 let mut newer = query!("SELECT object_id, object_version FROM objects_history");
244 newer = filter!(
245 newer,
246 format!(
247 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
248 range.first, range.last
249 )
250 );
251 let final_ = query!(
252 r#"SELECT
253 CAST(SUM(coin_balance) AS TEXT) as balance,
254 COUNT(*) as count,
255 coin_type
256 FROM ({}) candidates
257 LEFT JOIN ({}) newer
258 ON (
259 candidates.object_id = newer.object_id
260 AND candidates.object_version < newer.object_version
261 )"#,
262 candidates,
263 newer
264 );
265
266 filter!(final_, "newer.object_version IS NULL").group_by("coin_type")
268}
269
270fn filter(mut query: RawQuery, owner: SuiAddress, coin_type: Option<TypeTag>) -> RawQuery {
273 query = filter!(query, "coin_type IS NOT NULL AND object_status = 0");
274
275 query = filter!(
276 query,
277 format!(
278 "owner_id = '\\x{}'::bytea AND owner_type = {}",
279 hex::encode(owner.into_vec()),
280 OwnerType::Address as i16
281 )
282 );
283
284 if let Some(coin_type) = coin_type {
285 query = filter!(
286 query,
287 "coin_type = {}",
288 coin_type.to_canonical_display(true)
289 );
290 };
291
292 query
293}