sui_indexer_alt_jsonrpc/api/
coin.rs1use std::str::FromStr;
5
6use anyhow::Context as _;
7use diesel::prelude::*;
8use diesel::sql_types::Bool;
9use futures::future;
10use jsonrpsee::{core::RpcResult, http_client::HttpClient, proc_macros::rpc};
11use move_core_types::language_storage::{StructTag, TypeTag};
12use serde::{Deserialize, Serialize};
13use sui_indexer_alt_reader::coin_metadata::CoinMetadataKey;
14use sui_indexer_alt_schema::objects::StoredCoinOwnerKind;
15use sui_indexer_alt_schema::schema::coin_balance_buckets;
16use sui_json_rpc_types::{Balance, Coin, Page as PageResponse, SuiCoinMetadata};
17use sui_open_rpc::Module;
18use sui_open_rpc_macros::open_rpc;
19use sui_sql_macro::sql;
20use sui_types::coin::CoinMetadata;
21use sui_types::coin_registry::Currency;
22use sui_types::object::Object;
23use sui_types::{
24 base_types::{ObjectID, SuiAddress},
25 gas_coin::GAS,
26};
27
28use crate::{
29 context::Context,
30 data::load_live,
31 error::{InternalContext, RpcError, client_error_to_error_object, invalid_params},
32 paginate::{BcsCursor, Cursor as _, Page},
33};
34
35use super::rpc_module::RpcModule;
36
37#[open_rpc(namespace = "suix", tag = "Coin API")]
38#[rpc(server, namespace = "suix")]
39trait CoinsApi {
40 #[method(name = "getCoins")]
43 async fn get_coins(
44 &self,
45 owner: SuiAddress,
47 coin_type: Option<String>,
49 cursor: Option<String>,
51 limit: Option<usize>,
53 ) -> RpcResult<PageResponse<Coin, String>>;
54
55 #[method(name = "getCoinMetadata")]
59 async fn get_coin_metadata(
60 &self,
61 coin_type: String,
63 ) -> RpcResult<Option<SuiCoinMetadata>>;
64}
65
66#[open_rpc(namespace = "suix", tag = "Delegation Coin API")]
68#[rpc(server, client, namespace = "suix")]
69trait DelegationCoinsApi {
70 #[method(name = "getAllBalances")]
72 async fn get_all_balances(
73 &self,
74 owner: SuiAddress,
76 ) -> RpcResult<Vec<Balance>>;
77
78 #[method(name = "getBalance")]
81 async fn get_balance(
82 &self,
83 owner: SuiAddress,
85 coin_type: Option<String>,
87 ) -> RpcResult<Balance>;
88}
89
90pub(crate) struct Coins(pub Context);
91pub(crate) struct DelegationCoins(HttpClient);
92
93#[derive(thiserror::Error, Debug)]
94pub(crate) enum Error {
95 #[error("Pagination issue: {0}")]
96 Pagination(#[from] crate::paginate::Error),
97
98 #[error("Failed to parse type {0:?}: {1}")]
99 BadType(String, anyhow::Error),
100}
101
102#[derive(Queryable, Debug, Serialize, Deserialize)]
103#[diesel(table_name = coin_balance_buckets)]
104struct BalanceCursor {
105 object_id: Vec<u8>,
106 cp_sequence_number: u64,
107 coin_balance_bucket: u64,
108}
109
110type Cursor = BcsCursor<BalanceCursor>;
111
112impl DelegationCoins {
113 pub(crate) fn new(client: HttpClient) -> Self {
114 Self(client)
115 }
116}
117
118#[async_trait::async_trait]
119impl CoinsApiServer for Coins {
120 async fn get_coins(
121 &self,
122 owner: SuiAddress,
123 coin_type: Option<String>,
124 cursor: Option<String>,
125 limit: Option<usize>,
126 ) -> RpcResult<PageResponse<Coin, String>> {
127 let coin_type_tag = if let Some(coin_type) = coin_type {
128 sui_types::parse_sui_type_tag(&coin_type)
129 .map_err(|e| invalid_params(Error::BadType(coin_type, e)))?
130 } else {
131 GAS::type_tag()
132 };
133
134 let Self(ctx) = self;
135 let config = &ctx.config().coins;
136
137 let page: Page<Cursor> = Page::from_params::<Error>(
138 config.default_page_size,
139 config.max_page_size,
140 cursor,
141 limit,
142 None,
143 )?;
144
145 let coin_id_page = filter_coins(ctx, owner, Some(coin_type_tag), Some(page)).await?;
147
148 let coin_futures = coin_id_page.data.iter().map(|id| coin_response(ctx, *id));
149
150 let coins = future::join_all(coin_futures)
151 .await
152 .into_iter()
153 .zip(coin_id_page.data)
154 .map(|(r, id)| r.with_internal_context(|| format!("Failed to get object {id}")))
155 .collect::<Result<Vec<_>, _>>()?;
156
157 Ok(PageResponse {
158 data: coins,
159 next_cursor: coin_id_page.next_cursor,
160 has_next_page: coin_id_page.has_next_page,
161 })
162 }
163
164 async fn get_coin_metadata(&self, coin_type: String) -> RpcResult<Option<SuiCoinMetadata>> {
165 let Self(ctx) = self;
166
167 if let Some(currency) = coin_registry_response(ctx, &coin_type)
168 .await
169 .with_internal_context(|| format!("Failed to fetch Currency for {coin_type:?}"))?
170 {
171 return Ok(Some(currency));
172 }
173
174 if let Some(metadata) = coin_metadata_response(ctx, &coin_type)
175 .await
176 .with_internal_context(|| format!("Failed to fetch CoinMetadata for {coin_type:?}"))?
177 {
178 return Ok(Some(metadata));
179 }
180
181 Ok(None)
182 }
183}
184
185#[async_trait::async_trait]
186impl DelegationCoinsApiServer for DelegationCoins {
187 async fn get_all_balances(&self, owner: SuiAddress) -> RpcResult<Vec<Balance>> {
188 let Self(client) = self;
189
190 client
191 .get_all_balances(owner)
192 .await
193 .map_err(client_error_to_error_object)
194 }
195
196 async fn get_balance(
197 &self,
198 owner: SuiAddress,
199 coin_type: Option<String>,
200 ) -> RpcResult<Balance> {
201 let Self(client) = self;
202
203 client
204 .get_balance(owner, coin_type)
205 .await
206 .map_err(client_error_to_error_object)
207 }
208}
209
210impl RpcModule for Coins {
211 fn schema(&self) -> Module {
212 CoinsApiOpenRpc::module_doc()
213 }
214
215 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
216 self.into_rpc()
217 }
218}
219
220impl RpcModule for DelegationCoins {
221 fn schema(&self) -> Module {
222 DelegationCoinsApiOpenRpc::module_doc()
223 }
224
225 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
226 self.into_rpc()
227 }
228}
229
230async fn filter_coins(
231 ctx: &Context,
232 owner: SuiAddress,
233 coin_type_tag: Option<TypeTag>,
234 page: Option<Page<Cursor>>,
235) -> Result<PageResponse<ObjectID, String>, RpcError<Error>> {
236 use coin_balance_buckets::dsl as cb;
237
238 let mut conn = ctx
239 .pg_reader()
240 .connect()
241 .await
242 .context("Failed to connect to database")?;
243
244 let (candidates, newer) = diesel::alias!(
246 coin_balance_buckets as candidates,
247 coin_balance_buckets as newer
248 );
249
250 macro_rules! candidates {
252 ($field:ident) => {
253 candidates.field(cb::$field)
254 };
255 }
256
257 macro_rules! newer {
258 ($field:ident) => {
259 newer.field(cb::$field)
260 };
261 }
262
263 let mut query = candidates
265 .select((
266 candidates!(object_id),
267 candidates!(cp_sequence_number),
268 candidates!(coin_balance_bucket).assume_not_null(),
269 ))
270 .left_join(
271 newer.on(candidates!(object_id)
272 .eq(newer!(object_id))
273 .and(candidates!(cp_sequence_number).lt(newer!(cp_sequence_number)))),
274 )
275 .filter(newer!(object_id).is_null())
276 .filter(candidates!(owner_kind).eq(StoredCoinOwnerKind::Fastpath))
277 .filter(candidates!(owner_id).eq(owner.to_vec()))
278 .into_boxed();
279
280 if let Some(coin_type_tag) = coin_type_tag {
281 let serialized_coin_type =
282 bcs::to_bytes(&coin_type_tag).context("Failed to serialize coin type tag")?;
283 query = query.filter(candidates!(coin_type).eq(serialized_coin_type));
284 }
285
286 let (cursor, limit) = page.map_or((None, None), |p| (p.cursor, Some(p.limit)));
287
288 if let Some(c) = cursor {
290 query = query.filter(sql!(as Bool,
291 "(candidates.coin_balance_bucket, candidates.cp_sequence_number, candidates.object_id) < ({SmallInt}, {BigInt}, {Bytea})",
292 c.coin_balance_bucket as i16,
293 c.cp_sequence_number as i64,
294 c.object_id.clone(),
295 ));
296 }
297
298 query = query
300 .order_by(candidates!(coin_balance_bucket).desc())
301 .then_order_by(candidates!(cp_sequence_number).desc())
302 .then_order_by(candidates!(object_id).desc());
303
304 if let Some(limit) = limit {
305 query = query.limit(limit + 1);
306 }
307
308 let mut buckets: Vec<(Vec<u8>, i64, i16)> =
309 conn.results(query).await.context("Failed to query coins")?;
310
311 let mut has_next_page = false;
312
313 if let Some(limit) = limit {
314 has_next_page = buckets.len() > limit as usize;
316 if has_next_page {
317 buckets.truncate(limit as usize);
318 }
319 }
320
321 let next_cursor = buckets
322 .last()
323 .map(|(object_id, cp_sequence_number, coin_balance_bucket)| {
324 BcsCursor(BalanceCursor {
325 object_id: object_id.clone(),
326 cp_sequence_number: *cp_sequence_number as u64,
327 coin_balance_bucket: *coin_balance_bucket as u64,
328 })
329 .encode()
330 })
331 .transpose()
332 .context("Failed to encode cursor")?;
333
334 let ids = buckets
335 .iter()
336 .map(|(object_id, _, _)| ObjectID::from_bytes(object_id))
337 .collect::<Result<Vec<_>, _>>()
338 .context("Failed to parse object id")?;
339
340 Ok(PageResponse {
341 data: ids,
342 next_cursor,
343 has_next_page,
344 })
345}
346
347async fn coin_response(ctx: &Context, id: ObjectID) -> Result<Coin, RpcError<Error>> {
348 let (object, coin_type, balance) = object_with_coin_data(ctx, id).await?;
349
350 let coin_object_id = object.id();
351 let digest = object.digest();
352 let version = object.version();
353 let previous_transaction = object.as_inner().previous_transaction;
354
355 Ok(Coin {
356 coin_type,
357 coin_object_id,
358 version,
359 digest,
360 balance,
361 previous_transaction,
362 })
363}
364
365async fn coin_registry_response(
366 ctx: &Context,
367 coin_type: &str,
368) -> Result<Option<SuiCoinMetadata>, RpcError<Error>> {
369 let coin_type = TypeTag::from_str(coin_type)
370 .map_err(|e| invalid_params(Error::BadType(coin_type.to_owned(), e)))?;
371
372 let currency_id = Currency::derive_object_id(coin_type)
373 .context("Failed to derive object id for coin registry Currency")?;
374
375 let Some(object) = load_live(ctx, currency_id)
376 .await
377 .context("Failed to load Currency object")?
378 else {
379 return Ok(None);
380 };
381
382 let Some(move_object) = object.data.try_as_move() else {
383 return Ok(None);
384 };
385
386 let currency: Currency =
387 bcs::from_bytes(move_object.contents()).context("Failed to parse Currency object")?;
388
389 Ok(Some(currency.into()))
390}
391
392async fn coin_metadata_response(
393 ctx: &Context,
394 coin_type: &str,
395) -> Result<Option<SuiCoinMetadata>, RpcError<Error>> {
396 let coin_type = StructTag::from_str(coin_type)
397 .map_err(|e| invalid_params(Error::BadType(coin_type.to_owned(), e)))?;
398
399 let Some(stored) = ctx
400 .pg_loader()
401 .load_one(CoinMetadataKey(coin_type))
402 .await
403 .context("Failed to load info for CoinMetadata")?
404 else {
405 return Ok(None);
406 };
407
408 let id = ObjectID::from_bytes(&stored.object_id).context("Failed to parse ObjectID")?;
409
410 let Some(object) = load_live(ctx, id)
411 .await
412 .context("Failed to load latest version of CoinMetadata")?
413 else {
414 return Ok(None);
415 };
416
417 let Some(move_object) = object.data.try_as_move() else {
418 return Ok(None);
419 };
420
421 let coin_metadata: CoinMetadata =
422 bcs::from_bytes(move_object.contents()).context("Failed to parse Currency object")?;
423
424 Ok(Some(coin_metadata.into()))
425}
426
427async fn object_with_coin_data(
428 ctx: &Context,
429 id: ObjectID,
430) -> Result<(Object, String, u64), RpcError<Error>> {
431 let object = load_live(ctx, id)
432 .await?
433 .with_context(|| format!("Failed to load latest object {id}"))?;
434
435 let coin = object
436 .as_coin_maybe()
437 .context("Object is expected to be a coin")?;
438 let coin_type = object
439 .coin_type_maybe()
440 .context("Object is expected to have a coin type")?
441 .to_canonical_string(true);
442 Ok((object, coin_type, coin.balance.value()))
443}