sui_indexer/apis/
indexer_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use jsonrpsee::core::RpcResult;
6use jsonrpsee::core::SubscriptionResult;
7use jsonrpsee::{PendingSubscriptionSink, RpcModule};
8use tap::TapFallible;
9
10use sui_json_rpc::SuiRpcModule;
11use sui_json_rpc_api::{IndexerApiServer, cap_page_limit};
12use sui_json_rpc_types::{
13    DynamicFieldPage, EventFilter, EventPage, ObjectsPage, Page, SuiObjectResponse,
14    SuiObjectResponseQuery, SuiTransactionBlockResponseQuery, TransactionBlocksPage,
15    TransactionFilter,
16};
17use sui_name_service::{Domain, NameRecord, NameServiceConfig, NameServiceError};
18use sui_open_rpc::Module;
19use sui_types::TypeTag;
20use sui_types::base_types::{ObjectID, SuiAddress};
21use sui_types::digests::TransactionDigest;
22use sui_types::dynamic_field::{DynamicFieldName, Field};
23use sui_types::error::SuiObjectResponseError;
24use sui_types::event::EventID;
25use sui_types::object::ObjectRead;
26
27use crate::IndexerError;
28use crate::indexer_reader::IndexerReader;
29
30pub(crate) struct IndexerApi {
31    inner: IndexerReader,
32    name_service_config: NameServiceConfig,
33}
34
35impl IndexerApi {
36    pub fn new(inner: IndexerReader, name_service_config: NameServiceConfig) -> Self {
37        Self {
38            inner,
39            name_service_config,
40        }
41    }
42
43    async fn get_owned_objects_internal(
44        &self,
45        address: SuiAddress,
46        query: Option<SuiObjectResponseQuery>,
47        cursor: Option<ObjectID>,
48        limit: usize,
49    ) -> RpcResult<ObjectsPage> {
50        let SuiObjectResponseQuery { filter, options } = query.unwrap_or_default();
51        let options = options.unwrap_or_default();
52        let objects = self
53            .inner
54            .get_owned_objects(address, filter, cursor, limit + 1)
55            .await?;
56
57        let mut object_futures = vec![];
58        for object in objects {
59            object_futures.push(tokio::task::spawn(
60                object.try_into_object_read(self.inner.package_resolver()),
61            ));
62        }
63        let mut objects = futures::future::join_all(object_futures)
64            .await
65            .into_iter()
66            .collect::<Result<Vec<_>, _>>()
67            .map_err(|e| {
68                tracing::error!("Error joining object read futures.");
69                crate::errors::IndexerError::from(e)
70            })?
71            .into_iter()
72            .collect::<Result<Vec<_>, _>>()
73            .tap_err(|e| tracing::error!("Error converting object to object read: {}", e))?;
74        let has_next_page = objects.len() > limit;
75        objects.truncate(limit);
76
77        let next_cursor = objects.last().map(|o_read| o_read.object_id());
78        let mut parallel_tasks = vec![];
79        for o in objects {
80            let options = options.clone();
81            parallel_tasks.push(tokio::task::spawn(async move {
82                match o {
83                    ObjectRead::NotExists(id) => Ok(SuiObjectResponse::new_with_error(
84                        SuiObjectResponseError::NotExists { object_id: id },
85                    )),
86                    ObjectRead::Exists(object_ref, o, layout) => {
87                        if options.show_display {
88                            Err(IndexerError::NotSupportedError(
89                                "Display fields are not supported".to_owned(),
90                            )
91                            .into())
92                        } else {
93                            Ok(SuiObjectResponse::new_with_data(
94                                (object_ref, o, layout, options, None).try_into()?,
95                            ))
96                        }
97                    }
98                    ObjectRead::Deleted((object_id, version, digest)) => Ok(
99                        SuiObjectResponse::new_with_error(SuiObjectResponseError::Deleted {
100                            object_id,
101                            version,
102                            digest,
103                        }),
104                    ),
105                }
106            }));
107        }
108        let data = futures::future::join_all(parallel_tasks)
109            .await
110            .into_iter()
111            .collect::<Result<Vec<_>, _>>()
112            .map_err(|e: tokio::task::JoinError| anyhow::anyhow!(e))
113            .map_err(IndexerError::from)?
114            .into_iter()
115            .collect::<Result<Vec<_>, anyhow::Error>>()
116            .map_err(IndexerError::from)?;
117
118        Ok(Page {
119            data,
120            next_cursor,
121            has_next_page,
122        })
123    }
124}
125
126#[async_trait]
127impl IndexerApiServer for IndexerApi {
128    async fn get_owned_objects(
129        &self,
130        address: SuiAddress,
131        query: Option<SuiObjectResponseQuery>,
132        cursor: Option<ObjectID>,
133        limit: Option<usize>,
134    ) -> RpcResult<ObjectsPage> {
135        let limit = cap_page_limit(limit);
136        if limit == 0 {
137            return Ok(ObjectsPage::empty());
138        }
139        self.get_owned_objects_internal(address, query, cursor, limit)
140            .await
141    }
142
143    async fn query_transaction_blocks(
144        &self,
145        query: SuiTransactionBlockResponseQuery,
146        cursor: Option<TransactionDigest>,
147        limit: Option<usize>,
148        descending_order: Option<bool>,
149    ) -> RpcResult<TransactionBlocksPage> {
150        let limit = cap_page_limit(limit);
151        if limit == 0 {
152            return Ok(TransactionBlocksPage::empty());
153        }
154        let mut results = self
155            .inner
156            .query_transaction_blocks(
157                query.filter,
158                query.options.unwrap_or_default(),
159                cursor,
160                limit + 1,
161                descending_order.unwrap_or(false),
162            )
163            .await?;
164
165        let has_next_page = results.len() > limit;
166        results.truncate(limit);
167        let next_cursor = results.last().map(|o| o.digest);
168        Ok(Page {
169            data: results,
170            next_cursor,
171            has_next_page,
172        })
173    }
174
175    async fn query_events(
176        &self,
177        query: EventFilter,
178        // exclusive cursor if `Some`, otherwise start from the beginning
179        cursor: Option<EventID>,
180        limit: Option<usize>,
181        descending_order: Option<bool>,
182    ) -> RpcResult<EventPage> {
183        let limit = cap_page_limit(limit);
184        if limit == 0 {
185            return Ok(EventPage::empty());
186        }
187        let descending_order = descending_order.unwrap_or(false);
188        let mut results = self
189            .inner
190            .query_events(query, cursor, limit + 1, descending_order)
191            .await?;
192
193        let has_next_page = results.len() > limit;
194        results.truncate(limit);
195        let next_cursor = results.last().map(|o| o.id);
196        Ok(Page {
197            data: results,
198            next_cursor,
199            has_next_page,
200        })
201    }
202
203    async fn get_dynamic_fields(
204        &self,
205        parent_object_id: ObjectID,
206        cursor: Option<ObjectID>,
207        limit: Option<usize>,
208    ) -> RpcResult<DynamicFieldPage> {
209        let limit = cap_page_limit(limit);
210        if limit == 0 {
211            return Ok(DynamicFieldPage::empty());
212        }
213        let mut results = self
214            .inner
215            .get_dynamic_fields(parent_object_id, cursor, limit + 1)
216            .await?;
217
218        let has_next_page = results.len() > limit;
219        results.truncate(limit);
220        let next_cursor = results.last().map(|o| o.object_id);
221        Ok(Page {
222            data: results.into_iter().map(Into::into).collect(),
223            next_cursor,
224            has_next_page,
225        })
226    }
227
228    async fn get_dynamic_field_object(
229        &self,
230        parent_object_id: ObjectID,
231        name: DynamicFieldName,
232    ) -> RpcResult<SuiObjectResponse> {
233        let name_bcs_value = self.inner.bcs_name_from_dynamic_field_name(&name).await?;
234        // Try as Dynamic Field
235        let id = sui_types::dynamic_field::derive_dynamic_field_id(
236            parent_object_id,
237            &name.type_,
238            &name_bcs_value,
239        )
240        .expect("deriving dynamic field id can't fail");
241
242        let options = sui_json_rpc_types::SuiObjectDataOptions::full_content();
243        match self.inner.get_object_read(id).await? {
244            sui_types::object::ObjectRead::NotExists(_)
245            | sui_types::object::ObjectRead::Deleted(_) => {}
246            sui_types::object::ObjectRead::Exists(object_ref, o, layout) => {
247                return Ok(SuiObjectResponse::new_with_data(
248                    (object_ref, o, layout, options, None)
249                        .try_into()
250                        .map_err(IndexerError::from)?,
251                ));
252            }
253        }
254
255        // Try as Dynamic Field Object
256        let dynamic_object_field_struct =
257            sui_types::dynamic_field::DynamicFieldInfo::dynamic_object_field_wrapper(name.type_);
258        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
259        let dynamic_object_field_id = sui_types::dynamic_field::derive_dynamic_field_id(
260            parent_object_id,
261            &dynamic_object_field_type,
262            &name_bcs_value,
263        )
264        .expect("deriving dynamic field id can't fail");
265        match self.inner.get_object_read(dynamic_object_field_id).await? {
266            sui_types::object::ObjectRead::NotExists(_)
267            | sui_types::object::ObjectRead::Deleted(_) => {}
268            sui_types::object::ObjectRead::Exists(object_ref, o, layout) => {
269                return Ok(SuiObjectResponse::new_with_data(
270                    (object_ref, o, layout, options, None)
271                        .try_into()
272                        .map_err(IndexerError::from)?,
273                ));
274            }
275        }
276
277        Ok(SuiObjectResponse::new_with_error(
278            sui_types::error::SuiObjectResponseError::DynamicFieldNotFound { parent_object_id },
279        ))
280    }
281
282    fn subscribe_event(
283        &self,
284        _sink: PendingSubscriptionSink,
285        _filter: EventFilter,
286    ) -> SubscriptionResult {
287        Err("disabled".into())
288    }
289
290    fn subscribe_transaction(
291        &self,
292        _sink: PendingSubscriptionSink,
293        _filter: TransactionFilter,
294    ) -> SubscriptionResult {
295        Err("disabled".into())
296    }
297
298    async fn resolve_name_service_address(&self, name: String) -> RpcResult<Option<SuiAddress>> {
299        let domain: Domain = name.parse().map_err(IndexerError::NameServiceError)?;
300        let parent_domain = domain.parent();
301
302        // construct the record ids to lookup.
303        let record_id = self.name_service_config.record_field_id(&domain);
304        let parent_record_id = self.name_service_config.record_field_id(&parent_domain);
305
306        // get latest timestamp to check expiration.
307        let current_timestamp = self.inner.get_latest_checkpoint().await?.timestamp_ms;
308
309        // gather the requests to fetch in the multi_get_objs.
310        let mut requests = vec![record_id];
311
312        // we only want to fetch both the child and the parent if the domain is a subdomain.
313        if domain.is_subdomain() {
314            requests.push(parent_record_id);
315        }
316
317        // fetch both parent (if subdomain) and child records in a single get query.
318        // We do this as we do not know if the subdomain is a node or leaf record.
319        let domains: Vec<_> = self
320            .inner
321            .multi_get_objects(requests)
322            .await?
323            .into_iter()
324            .map(|o| sui_types::object::Object::try_from(o).ok())
325            .collect();
326
327        // Find the requested object in the list of domains.
328        // We need to loop (in an array of maximum size 2), as we cannot guarantee
329        // the order of the returned objects.
330        let Some(requested_object) = domains
331            .iter()
332            .find(|o| o.as_ref().is_some_and(|o| o.id() == record_id))
333            .and_then(|o| o.clone())
334        else {
335            return Ok(None);
336        };
337
338        let name_record: NameRecord = requested_object.try_into().map_err(IndexerError::from)?;
339
340        // Handle NODE record case.
341        if !name_record.is_leaf_record() {
342            return if !name_record.is_node_expired(current_timestamp) {
343                Ok(name_record.target_address)
344            } else {
345                Err(IndexerError::NameServiceError(NameServiceError::NameExpired).into())
346            };
347        }
348
349        // repeat the process for the parent object too.
350        let Some(requested_object) = domains
351            .iter()
352            .find(|o| o.as_ref().is_some_and(|o| o.id() == parent_record_id))
353            .and_then(|o| o.clone())
354        else {
355            return Err(IndexerError::NameServiceError(NameServiceError::NameExpired).into());
356        };
357
358        let parent_record: NameRecord = requested_object.try_into().map_err(IndexerError::from)?;
359
360        if parent_record.is_valid_leaf_parent(&name_record)
361            && !parent_record.is_node_expired(current_timestamp)
362        {
363            Ok(name_record.target_address)
364        } else {
365            Err(IndexerError::NameServiceError(NameServiceError::NameExpired).into())
366        }
367    }
368
369    async fn resolve_name_service_names(
370        &self,
371        address: SuiAddress,
372        _cursor: Option<ObjectID>,
373        _limit: Option<usize>,
374    ) -> RpcResult<Page<String, ObjectID>> {
375        let reverse_record_id = self
376            .name_service_config
377            .reverse_record_field_id(address.as_ref());
378
379        let mut result = Page {
380            data: vec![],
381            next_cursor: None,
382            has_next_page: false,
383        };
384
385        let Some(field_reverse_record_object) =
386            self.inner.get_object(&reverse_record_id, None).await?
387        else {
388            return Ok(result);
389        };
390
391        let domain = field_reverse_record_object
392            .to_rust::<Field<SuiAddress, Domain>>()
393            .ok_or_else(|| {
394                IndexerError::PersistentStorageDataCorruptionError(format!(
395                    "Malformed Object {reverse_record_id}"
396                ))
397            })?
398            .value;
399
400        let domain_name = domain.to_string();
401
402        // Tries to resolve the name, to verify it is not expired.
403        let resolved_address = self
404            .resolve_name_service_address(domain_name.clone())
405            .await?;
406
407        // If we do not have a resolved address, we do not include the domain in the result.
408        if resolved_address.is_none() {
409            return Ok(result);
410        }
411
412        // We push the domain name to the result and return it.
413        result.data.push(domain_name);
414
415        Ok(result)
416    }
417}
418
419impl SuiRpcModule for IndexerApi {
420    fn rpc(self) -> RpcModule<Self> {
421        self.into_rpc()
422    }
423
424    fn rpc_doc_module() -> Module {
425        sui_json_rpc_api::IndexerApiOpenRpc::module_doc()
426    }
427}