1use std::collections::HashSet;
4use std::sync::Arc;
5
6use anyhow::bail;
7use async_trait::async_trait;
8use futures::{Stream, StreamExt, future};
9use jsonrpsee::{
10 PendingSubscriptionSink, RpcModule,
11 core::{RpcResult, SubscriptionResult},
12};
13use move_bytecode_utils::layout::TypeLayoutBuilder;
14use move_core_types::language_storage::TypeTag;
15use mysten_metrics::spawn_monitored_task;
16use serde::Serialize;
17use sui_core::authority::AuthorityState;
18use sui_json::SuiJsonValue;
19use sui_json_rpc_api::{
20 IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
21 cap_page_limit, validate_limit,
22};
23use sui_json_rpc_types::{
24 DynamicFieldPage, EventFilter, EventPage, ObjectsPage, Page, SuiObjectDataOptions,
25 SuiObjectResponse, SuiObjectResponseQuery, SuiTransactionBlockResponse,
26 SuiTransactionBlockResponseQuery, TransactionBlocksPage, TransactionFilter,
27};
28use sui_name_service::{Domain, NameRecord, NameServiceConfig, NameServiceError};
29use sui_open_rpc::Module;
30use sui_storage::key_value_store::TransactionKeyValueStore;
31use sui_types::{
32 base_types::{ObjectID, SuiAddress},
33 digests::TransactionDigest,
34 dynamic_field::{DynamicFieldName, Field},
35 error::SuiObjectResponseError,
36 event::EventID,
37};
38use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39use tracing::{instrument, warn};
40
41use crate::{
42 SuiRpcModule,
43 authority_state::{StateRead, StateReadResult},
44 error::{Error, SuiRpcInputError},
45 with_tracing,
46};
47
48pub fn spawn_subscription<S, T>(
49 sink: PendingSubscriptionSink,
50 mut rx: S,
51 permit: Option<OwnedSemaphorePermit>,
52) where
53 S: Stream<Item = T> + Unpin + Send + 'static,
54 T: Serialize + Send,
55{
56 spawn_monitored_task!(async move {
57 let Ok(sink) = sink.accept().await else {
58 return;
59 };
60 let _permit = permit;
61
62 while let Some(item) = rx.next().await {
63 let Ok(message) = jsonrpsee::server::SubscriptionMessage::from_json(&item) else {
64 break;
65 };
66 let Ok(()) = sink.send(message).await else {
67 break;
68 };
69 }
70
71 });
86}
87const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
88
89pub struct IndexerApi<R> {
90 state: Arc<dyn StateRead>,
91 read_api: R,
92 transaction_kv_store: Arc<TransactionKeyValueStore>,
93 name_service_config: NameServiceConfig,
94 pub metrics: Arc<JsonRpcMetrics>,
95 subscription_semaphore: Arc<Semaphore>,
96}
97
98impl<R: ReadApiServer> IndexerApi<R> {
99 pub fn new(
100 state: Arc<AuthorityState>,
101 read_api: R,
102 transaction_kv_store: Arc<TransactionKeyValueStore>,
103 name_service_config: NameServiceConfig,
104 metrics: Arc<JsonRpcMetrics>,
105 max_subscriptions: Option<usize>,
106 ) -> Self {
107 let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
108 Self {
109 state,
110 transaction_kv_store,
111 read_api,
112 name_service_config,
113 metrics,
114 subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
115 }
116 }
117
118 fn extract_values_from_dynamic_field_name(
119 &self,
120 name: DynamicFieldName,
121 ) -> Result<(TypeTag, Vec<u8>), SuiRpcInputError> {
122 let DynamicFieldName {
123 type_: name_type,
124 value,
125 } = name;
126 let epoch_store = self.state.load_epoch_store_one_call_per_task();
127 let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
128 let sui_json_value = SuiJsonValue::new(value)?;
129 let name_bcs_value = sui_json_value.to_bcs_bytes(&layout)?;
130 Ok((name_type, name_bcs_value))
131 }
132
133 fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
134 match self.subscription_semaphore.clone().try_acquire_owned() {
135 Ok(p) => Ok(p),
136 Err(_) => bail!("Resources exhausted"),
137 }
138 }
139
140 fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
141 let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
142
143 let checkpoint = self
144 .state
145 .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
146
147 Ok(checkpoint.timestamp_ms)
148 }
149}
150
151#[async_trait]
152impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
153 #[instrument(skip(self))]
154 async fn get_owned_objects(
155 &self,
156 address: SuiAddress,
157 query: Option<SuiObjectResponseQuery>,
158 cursor: Option<ObjectID>,
159 limit: Option<usize>,
160 ) -> RpcResult<ObjectsPage> {
161 with_tracing!(async move {
162 let limit =
163 validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(SuiRpcInputError::from)?;
164 self.metrics.get_owned_objects_limit.observe(limit as f64);
165 let SuiObjectResponseQuery { filter, options } = query.unwrap_or_default();
166 let options = options.unwrap_or_default();
167 let mut objects = self
168 .state
169 .get_owner_objects_with_limit(address, cursor, limit + 1, filter)
170 .map_err(Error::from)?;
171
172 let has_next_page = objects.len() > limit;
174 objects.truncate(limit);
175 let next_cursor = objects
176 .last()
177 .cloned()
178 .map_or(cursor, |o_info| Some(o_info.object_id));
179
180 let data = match options.is_not_in_object_info() {
181 true => {
182 let object_ids = objects.iter().map(|obj| obj.object_id).collect();
183 self.read_api
184 .multi_get_objects(object_ids, Some(options))
185 .await?
186 }
187 false => objects
188 .into_iter()
189 .map(|o_info| SuiObjectResponse::try_from((o_info, options.clone())))
190 .collect::<Result<Vec<SuiObjectResponse>, _>>()?,
191 };
192
193 self.metrics
194 .get_owned_objects_result_size
195 .observe(data.len() as f64);
196 self.metrics
197 .get_owned_objects_result_size_total
198 .inc_by(data.len() as u64);
199 Ok(Page {
200 data,
201 next_cursor,
202 has_next_page,
203 })
204 })
205 }
206
207 #[instrument(skip(self))]
208 async fn query_transaction_blocks(
209 &self,
210 query: SuiTransactionBlockResponseQuery,
211 cursor: Option<TransactionDigest>,
213 limit: Option<usize>,
214 descending_order: Option<bool>,
215 ) -> RpcResult<TransactionBlocksPage> {
216 with_tracing!(async move {
217 let limit = cap_page_limit(limit);
218 self.metrics.query_tx_blocks_limit.observe(limit as f64);
219 let descending = descending_order.unwrap_or_default();
220 let opts = query.options.unwrap_or_default();
221
222 let mut digests = self
224 .state
225 .get_transactions(
226 &self.transaction_kv_store,
227 query.filter,
228 cursor,
229 Some(limit + 1),
230 descending,
231 )
232 .await
233 .map_err(Error::from)?;
234 let mut seen = HashSet::new();
237 digests.retain(|digest| seen.insert(*digest));
238
239 let has_next_page = digests.len() > limit;
241 digests.truncate(limit);
242 let next_cursor = digests.last().cloned().map_or(cursor, Some);
243
244 let data: Vec<SuiTransactionBlockResponse> = if opts.only_digest() {
245 digests
246 .into_iter()
247 .map(SuiTransactionBlockResponse::new)
248 .collect()
249 } else {
250 self.read_api
251 .multi_get_transaction_blocks(digests, Some(opts))
252 .await?
253 };
254
255 self.metrics
256 .query_tx_blocks_result_size
257 .observe(data.len() as f64);
258 self.metrics
259 .query_tx_blocks_result_size_total
260 .inc_by(data.len() as u64);
261 Ok(Page {
262 data,
263 next_cursor,
264 has_next_page,
265 })
266 })
267 }
268 #[instrument(skip(self))]
269 async fn query_events(
270 &self,
271 query: EventFilter,
272 cursor: Option<EventID>,
274 limit: Option<usize>,
275 descending_order: Option<bool>,
276 ) -> RpcResult<EventPage> {
277 with_tracing!(async move {
278 let descending = descending_order.unwrap_or_default();
279 let limit = cap_page_limit(limit);
280 self.metrics.query_events_limit.observe(limit as f64);
281 let mut data = self
283 .state
284 .query_events(
285 &self.transaction_kv_store,
286 query,
287 cursor,
288 limit + 1,
289 descending,
290 )
291 .await
292 .map_err(Error::from)?;
293 let has_next_page = data.len() > limit;
294 data.truncate(limit);
295 let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
296 self.metrics
297 .query_events_result_size
298 .observe(data.len() as f64);
299 self.metrics
300 .query_events_result_size_total
301 .inc_by(data.len() as u64);
302 Ok(EventPage {
303 data,
304 next_cursor,
305 has_next_page,
306 })
307 })
308 }
309
310 #[instrument(skip(self))]
311 fn subscribe_event(
312 &self,
313 sink: PendingSubscriptionSink,
314 filter: EventFilter,
315 ) -> SubscriptionResult {
316 let permit = self.acquire_subscribe_permit()?;
317 spawn_subscription(
318 sink,
319 self.state
320 .get_subscription_handler()
321 .subscribe_events(filter),
322 Some(permit),
323 );
324 Ok(())
325 }
326
327 fn subscribe_transaction(
328 &self,
329 sink: PendingSubscriptionSink,
330 filter: TransactionFilter,
331 ) -> SubscriptionResult {
332 let permit = self.acquire_subscribe_permit()?;
333 spawn_subscription(
334 sink,
335 self.state
336 .get_subscription_handler()
337 .subscribe_transactions(filter),
338 Some(permit),
339 );
340 Ok(())
341 }
342
343 #[instrument(skip(self))]
344 async fn get_dynamic_fields(
345 &self,
346 parent_object_id: ObjectID,
347 cursor: Option<ObjectID>,
349 limit: Option<usize>,
350 ) -> RpcResult<DynamicFieldPage> {
351 with_tracing!(async move {
352 let limit = cap_page_limit(limit);
353 self.metrics.get_dynamic_fields_limit.observe(limit as f64);
354 let mut data = self
355 .state
356 .get_dynamic_fields(parent_object_id, cursor, limit + 1)
357 .map_err(Error::from)?;
358 let has_next_page = data.len() > limit;
359 data.truncate(limit);
360 let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
361 self.metrics
362 .get_dynamic_fields_result_size
363 .observe(data.len() as f64);
364 self.metrics
365 .get_dynamic_fields_result_size_total
366 .inc_by(data.len() as u64);
367 Ok(DynamicFieldPage {
368 data: data.into_iter().map(|(_, w)| w.into()).collect(),
369 next_cursor,
370 has_next_page,
371 })
372 })
373 }
374
375 #[instrument(skip(self))]
376 async fn get_dynamic_field_object(
377 &self,
378 parent_object_id: ObjectID,
379 name: DynamicFieldName,
380 ) -> RpcResult<SuiObjectResponse> {
381 with_tracing!(async move {
382 let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
383
384 let id = self
385 .state
386 .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
387 .map_err(Error::from)?;
388 if let Some(id) = id {
390 self.read_api
391 .get_object(id, Some(SuiObjectDataOptions::full_content()))
392 .await
393 .map_err(Error::from)
394 } else {
395 Ok(SuiObjectResponse::new_with_error(
396 SuiObjectResponseError::DynamicFieldNotFound { parent_object_id },
397 ))
398 }
399 })
400 }
401
402 #[instrument(skip(self))]
403 async fn resolve_name_service_address(&self, name: String) -> RpcResult<Option<SuiAddress>> {
404 with_tracing!(async move {
405 let domain = name.parse::<Domain>().map_err(Error::from)?;
407 let record_id = self.name_service_config.record_field_id(&domain);
408
409 let parent_domain = domain.parent();
411 let parent_record_id = self.name_service_config.record_field_id(&parent_domain);
412
413 let current_timestamp_ms = self.get_latest_checkpoint_timestamp_ms()?;
414
415 let mut requests = vec![self.state.get_object(&record_id)];
417
418 if domain.is_subdomain() {
420 requests.push(self.state.get_object(&parent_record_id));
421 }
422
423 let mut results = future::try_join_all(requests).await?;
428
429 let Some(object) = results.remove(0) else {
432 return Ok(None);
433 };
434
435 let name_record = NameRecord::try_from(object)?;
436
437 if !name_record.is_leaf_record() {
440 return if !name_record.is_node_expired(current_timestamp_ms) {
441 Ok(name_record.target_address)
442 } else {
443 Err(Error::from(NameServiceError::NameExpired))
444 };
445 }
446
447 let Some(parent_object) = results.remove(0) else {
451 return Err(Error::from(NameServiceError::NameExpired));
452 };
453
454 let parent_name_record = NameRecord::try_from(parent_object)?;
455
456 if parent_name_record.is_valid_leaf_parent(&name_record)
460 && !parent_name_record.is_node_expired(current_timestamp_ms)
461 {
462 Ok(name_record.target_address)
463 } else {
464 Err(Error::from(NameServiceError::NameExpired))
465 }
466 })
467 }
468
469 #[instrument(skip(self))]
470 async fn resolve_name_service_names(
471 &self,
472 address: SuiAddress,
473 _cursor: Option<ObjectID>,
474 _limit: Option<usize>,
475 ) -> RpcResult<Page<String, ObjectID>> {
476 with_tracing!(async move {
477 let reverse_record_id = self
478 .name_service_config
479 .reverse_record_field_id(address.as_ref());
480
481 let mut result = Page {
482 data: vec![],
483 next_cursor: None,
484 has_next_page: false,
485 };
486
487 let Some(field_reverse_record_object) =
488 self.state.get_object(&reverse_record_id).await?
489 else {
490 return Ok(result);
491 };
492
493 let domain = field_reverse_record_object
494 .to_rust::<Field<SuiAddress, Domain>>()
495 .ok_or_else(|| {
496 Error::UnexpectedError(format!("Malformed Object {reverse_record_id}"))
497 })?
498 .value;
499
500 let domain_name = domain.to_string();
501
502 let resolved_address = self
503 .resolve_name_service_address(domain_name.clone())
504 .await?;
505
506 if resolved_address.is_none() {
508 return Ok(result);
509 }
510
511 result.data.push(domain_name);
514
515 Ok(result)
516 })
517 }
518}
519
520impl<R: ReadApiServer> SuiRpcModule for IndexerApi<R> {
521 fn rpc(self) -> RpcModule<Self> {
522 self.into_rpc()
523 }
524
525 fn rpc_doc_module() -> Module {
526 IndexerApiOpenRpc::module_doc()
527 }
528}