1use async_trait::async_trait;
5use jsonrpsee::core::RpcResult;
6use jsonrpsee::core::SubscriptionResult;
7use jsonrpsee::{PendingSubscriptionSink, RpcModule};
8use tap::TapFallible;
9
10use sui_json_rpc::SuiRpcModule;
11use sui_json_rpc_api::{IndexerApiServer, cap_page_limit};
12use sui_json_rpc_types::{
13 DynamicFieldPage, EventFilter, EventPage, ObjectsPage, Page, SuiObjectResponse,
14 SuiObjectResponseQuery, SuiTransactionBlockResponseQuery, TransactionBlocksPage,
15 TransactionFilter,
16};
17use sui_name_service::{Domain, NameRecord, NameServiceConfig, NameServiceError};
18use sui_open_rpc::Module;
19use sui_types::TypeTag;
20use sui_types::base_types::{ObjectID, SuiAddress};
21use sui_types::digests::TransactionDigest;
22use sui_types::dynamic_field::{DynamicFieldName, Field};
23use sui_types::error::SuiObjectResponseError;
24use sui_types::event::EventID;
25use sui_types::object::ObjectRead;
26
27use crate::IndexerError;
28use crate::indexer_reader::IndexerReader;
29
30pub(crate) struct IndexerApi {
31 inner: IndexerReader,
32 name_service_config: NameServiceConfig,
33}
34
35impl IndexerApi {
36 pub fn new(inner: IndexerReader, name_service_config: NameServiceConfig) -> Self {
37 Self {
38 inner,
39 name_service_config,
40 }
41 }
42
43 async fn get_owned_objects_internal(
44 &self,
45 address: SuiAddress,
46 query: Option<SuiObjectResponseQuery>,
47 cursor: Option<ObjectID>,
48 limit: usize,
49 ) -> RpcResult<ObjectsPage> {
50 let SuiObjectResponseQuery { filter, options } = query.unwrap_or_default();
51 let options = options.unwrap_or_default();
52 let objects = self
53 .inner
54 .get_owned_objects(address, filter, cursor, limit + 1)
55 .await?;
56
57 let mut object_futures = vec![];
58 for object in objects {
59 object_futures.push(tokio::task::spawn(
60 object.try_into_object_read(self.inner.package_resolver()),
61 ));
62 }
63 let mut objects = futures::future::join_all(object_futures)
64 .await
65 .into_iter()
66 .collect::<Result<Vec<_>, _>>()
67 .map_err(|e| {
68 tracing::error!("Error joining object read futures.");
69 crate::errors::IndexerError::from(e)
70 })?
71 .into_iter()
72 .collect::<Result<Vec<_>, _>>()
73 .tap_err(|e| tracing::error!("Error converting object to object read: {}", e))?;
74 let has_next_page = objects.len() > limit;
75 objects.truncate(limit);
76
77 let next_cursor = objects.last().map(|o_read| o_read.object_id());
78 let mut parallel_tasks = vec![];
79 for o in objects {
80 let options = options.clone();
81 parallel_tasks.push(tokio::task::spawn(async move {
82 match o {
83 ObjectRead::NotExists(id) => Ok(SuiObjectResponse::new_with_error(
84 SuiObjectResponseError::NotExists { object_id: id },
85 )),
86 ObjectRead::Exists(object_ref, o, layout) => {
87 if options.show_display {
88 Err(IndexerError::NotSupportedError(
89 "Display fields are not supported".to_owned(),
90 )
91 .into())
92 } else {
93 Ok(SuiObjectResponse::new_with_data(
94 (object_ref, o, layout, options, None).try_into()?,
95 ))
96 }
97 }
98 ObjectRead::Deleted((object_id, version, digest)) => Ok(
99 SuiObjectResponse::new_with_error(SuiObjectResponseError::Deleted {
100 object_id,
101 version,
102 digest,
103 }),
104 ),
105 }
106 }));
107 }
108 let data = futures::future::join_all(parallel_tasks)
109 .await
110 .into_iter()
111 .collect::<Result<Vec<_>, _>>()
112 .map_err(|e: tokio::task::JoinError| anyhow::anyhow!(e))
113 .map_err(IndexerError::from)?
114 .into_iter()
115 .collect::<Result<Vec<_>, anyhow::Error>>()
116 .map_err(IndexerError::from)?;
117
118 Ok(Page {
119 data,
120 next_cursor,
121 has_next_page,
122 })
123 }
124}
125
126#[async_trait]
127impl IndexerApiServer for IndexerApi {
128 async fn get_owned_objects(
129 &self,
130 address: SuiAddress,
131 query: Option<SuiObjectResponseQuery>,
132 cursor: Option<ObjectID>,
133 limit: Option<usize>,
134 ) -> RpcResult<ObjectsPage> {
135 let limit = cap_page_limit(limit);
136 if limit == 0 {
137 return Ok(ObjectsPage::empty());
138 }
139 self.get_owned_objects_internal(address, query, cursor, limit)
140 .await
141 }
142
143 async fn query_transaction_blocks(
144 &self,
145 query: SuiTransactionBlockResponseQuery,
146 cursor: Option<TransactionDigest>,
147 limit: Option<usize>,
148 descending_order: Option<bool>,
149 ) -> RpcResult<TransactionBlocksPage> {
150 let limit = cap_page_limit(limit);
151 if limit == 0 {
152 return Ok(TransactionBlocksPage::empty());
153 }
154 let mut results = self
155 .inner
156 .query_transaction_blocks(
157 query.filter,
158 query.options.unwrap_or_default(),
159 cursor,
160 limit + 1,
161 descending_order.unwrap_or(false),
162 )
163 .await?;
164
165 let has_next_page = results.len() > limit;
166 results.truncate(limit);
167 let next_cursor = results.last().map(|o| o.digest);
168 Ok(Page {
169 data: results,
170 next_cursor,
171 has_next_page,
172 })
173 }
174
175 async fn query_events(
176 &self,
177 query: EventFilter,
178 cursor: Option<EventID>,
180 limit: Option<usize>,
181 descending_order: Option<bool>,
182 ) -> RpcResult<EventPage> {
183 let limit = cap_page_limit(limit);
184 if limit == 0 {
185 return Ok(EventPage::empty());
186 }
187 let descending_order = descending_order.unwrap_or(false);
188 let mut results = self
189 .inner
190 .query_events(query, cursor, limit + 1, descending_order)
191 .await?;
192
193 let has_next_page = results.len() > limit;
194 results.truncate(limit);
195 let next_cursor = results.last().map(|o| o.id);
196 Ok(Page {
197 data: results,
198 next_cursor,
199 has_next_page,
200 })
201 }
202
203 async fn get_dynamic_fields(
204 &self,
205 parent_object_id: ObjectID,
206 cursor: Option<ObjectID>,
207 limit: Option<usize>,
208 ) -> RpcResult<DynamicFieldPage> {
209 let limit = cap_page_limit(limit);
210 if limit == 0 {
211 return Ok(DynamicFieldPage::empty());
212 }
213 let mut results = self
214 .inner
215 .get_dynamic_fields(parent_object_id, cursor, limit + 1)
216 .await?;
217
218 let has_next_page = results.len() > limit;
219 results.truncate(limit);
220 let next_cursor = results.last().map(|o| o.object_id);
221 Ok(Page {
222 data: results.into_iter().map(Into::into).collect(),
223 next_cursor,
224 has_next_page,
225 })
226 }
227
228 async fn get_dynamic_field_object(
229 &self,
230 parent_object_id: ObjectID,
231 name: DynamicFieldName,
232 ) -> RpcResult<SuiObjectResponse> {
233 let name_bcs_value = self.inner.bcs_name_from_dynamic_field_name(&name).await?;
234 let id = sui_types::dynamic_field::derive_dynamic_field_id(
236 parent_object_id,
237 &name.type_,
238 &name_bcs_value,
239 )
240 .expect("deriving dynamic field id can't fail");
241
242 let options = sui_json_rpc_types::SuiObjectDataOptions::full_content();
243 match self.inner.get_object_read(id).await? {
244 sui_types::object::ObjectRead::NotExists(_)
245 | sui_types::object::ObjectRead::Deleted(_) => {}
246 sui_types::object::ObjectRead::Exists(object_ref, o, layout) => {
247 return Ok(SuiObjectResponse::new_with_data(
248 (object_ref, o, layout, options, None)
249 .try_into()
250 .map_err(IndexerError::from)?,
251 ));
252 }
253 }
254
255 let dynamic_object_field_struct =
257 sui_types::dynamic_field::DynamicFieldInfo::dynamic_object_field_wrapper(name.type_);
258 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
259 let dynamic_object_field_id = sui_types::dynamic_field::derive_dynamic_field_id(
260 parent_object_id,
261 &dynamic_object_field_type,
262 &name_bcs_value,
263 )
264 .expect("deriving dynamic field id can't fail");
265 match self.inner.get_object_read(dynamic_object_field_id).await? {
266 sui_types::object::ObjectRead::NotExists(_)
267 | sui_types::object::ObjectRead::Deleted(_) => {}
268 sui_types::object::ObjectRead::Exists(object_ref, o, layout) => {
269 return Ok(SuiObjectResponse::new_with_data(
270 (object_ref, o, layout, options, None)
271 .try_into()
272 .map_err(IndexerError::from)?,
273 ));
274 }
275 }
276
277 Ok(SuiObjectResponse::new_with_error(
278 sui_types::error::SuiObjectResponseError::DynamicFieldNotFound { parent_object_id },
279 ))
280 }
281
282 fn subscribe_event(
283 &self,
284 _sink: PendingSubscriptionSink,
285 _filter: EventFilter,
286 ) -> SubscriptionResult {
287 Err("disabled".into())
288 }
289
290 fn subscribe_transaction(
291 &self,
292 _sink: PendingSubscriptionSink,
293 _filter: TransactionFilter,
294 ) -> SubscriptionResult {
295 Err("disabled".into())
296 }
297
298 async fn resolve_name_service_address(&self, name: String) -> RpcResult<Option<SuiAddress>> {
299 let domain: Domain = name.parse().map_err(IndexerError::NameServiceError)?;
300 let parent_domain = domain.parent();
301
302 let record_id = self.name_service_config.record_field_id(&domain);
304 let parent_record_id = self.name_service_config.record_field_id(&parent_domain);
305
306 let current_timestamp = self.inner.get_latest_checkpoint().await?.timestamp_ms;
308
309 let mut requests = vec![record_id];
311
312 if domain.is_subdomain() {
314 requests.push(parent_record_id);
315 }
316
317 let domains: Vec<_> = self
320 .inner
321 .multi_get_objects(requests)
322 .await?
323 .into_iter()
324 .map(|o| sui_types::object::Object::try_from(o).ok())
325 .collect();
326
327 let Some(requested_object) = domains
331 .iter()
332 .find(|o| o.as_ref().is_some_and(|o| o.id() == record_id))
333 .and_then(|o| o.clone())
334 else {
335 return Ok(None);
336 };
337
338 let name_record: NameRecord = requested_object.try_into().map_err(IndexerError::from)?;
339
340 if !name_record.is_leaf_record() {
342 return if !name_record.is_node_expired(current_timestamp) {
343 Ok(name_record.target_address)
344 } else {
345 Err(IndexerError::NameServiceError(NameServiceError::NameExpired).into())
346 };
347 }
348
349 let Some(requested_object) = domains
351 .iter()
352 .find(|o| o.as_ref().is_some_and(|o| o.id() == parent_record_id))
353 .and_then(|o| o.clone())
354 else {
355 return Err(IndexerError::NameServiceError(NameServiceError::NameExpired).into());
356 };
357
358 let parent_record: NameRecord = requested_object.try_into().map_err(IndexerError::from)?;
359
360 if parent_record.is_valid_leaf_parent(&name_record)
361 && !parent_record.is_node_expired(current_timestamp)
362 {
363 Ok(name_record.target_address)
364 } else {
365 Err(IndexerError::NameServiceError(NameServiceError::NameExpired).into())
366 }
367 }
368
369 async fn resolve_name_service_names(
370 &self,
371 address: SuiAddress,
372 _cursor: Option<ObjectID>,
373 _limit: Option<usize>,
374 ) -> RpcResult<Page<String, ObjectID>> {
375 let reverse_record_id = self
376 .name_service_config
377 .reverse_record_field_id(address.as_ref());
378
379 let mut result = Page {
380 data: vec![],
381 next_cursor: None,
382 has_next_page: false,
383 };
384
385 let Some(field_reverse_record_object) =
386 self.inner.get_object(&reverse_record_id, None).await?
387 else {
388 return Ok(result);
389 };
390
391 let domain = field_reverse_record_object
392 .to_rust::<Field<SuiAddress, Domain>>()
393 .ok_or_else(|| {
394 IndexerError::PersistentStorageDataCorruptionError(format!(
395 "Malformed Object {reverse_record_id}"
396 ))
397 })?
398 .value;
399
400 let domain_name = domain.to_string();
401
402 let resolved_address = self
404 .resolve_name_service_address(domain_name.clone())
405 .await?;
406
407 if resolved_address.is_none() {
409 return Ok(result);
410 }
411
412 result.data.push(domain_name);
414
415 Ok(result)
416 }
417}
418
419impl SuiRpcModule for IndexerApi {
420 fn rpc(self) -> RpcModule<Self> {
421 self.into_rpc()
422 }
423
424 fn rpc_doc_module() -> Module {
425 sui_json_rpc_api::IndexerApiOpenRpc::module_doc()
426 }
427}