sui_json_rpc/
indexer_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::collections::HashSet;
4use std::sync::Arc;
5
6use anyhow::bail;
7use async_trait::async_trait;
8use futures::{Stream, StreamExt, future};
9use jsonrpsee::{
10    PendingSubscriptionSink, RpcModule,
11    core::{RpcResult, SubscriptionResult},
12};
13use move_bytecode_utils::layout::TypeLayoutBuilder;
14use move_core_types::language_storage::TypeTag;
15use mysten_metrics::spawn_monitored_task;
16use serde::Serialize;
17use sui_core::authority::AuthorityState;
18use sui_json::SuiJsonValue;
19use sui_json_rpc_api::{
20    IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
21    cap_page_limit, validate_limit,
22};
23use sui_json_rpc_types::{
24    DynamicFieldPage, EventFilter, EventPage, ObjectsPage, Page, SuiObjectDataOptions,
25    SuiObjectResponse, SuiObjectResponseQuery, SuiTransactionBlockResponse,
26    SuiTransactionBlockResponseQuery, TransactionBlocksPage, TransactionFilter,
27};
28use sui_name_service::{Domain, NameRecord, NameServiceConfig, NameServiceError};
29use sui_open_rpc::Module;
30use sui_storage::key_value_store::TransactionKeyValueStore;
31use sui_types::{
32    base_types::{ObjectID, SuiAddress},
33    digests::TransactionDigest,
34    dynamic_field::{DynamicFieldName, Field},
35    error::SuiObjectResponseError,
36    event::EventID,
37};
38use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39use tracing::{instrument, warn};
40
41use crate::{
42    SuiRpcModule,
43    authority_state::{StateRead, StateReadResult},
44    error::{Error, SuiRpcInputError},
45    with_tracing,
46};
47
48pub fn spawn_subscription<S, T>(
49    sink: PendingSubscriptionSink,
50    mut rx: S,
51    permit: Option<OwnedSemaphorePermit>,
52) where
53    S: Stream<Item = T> + Unpin + Send + 'static,
54    T: Serialize + Send,
55{
56    spawn_monitored_task!(async move {
57        let Ok(sink) = sink.accept().await else {
58            return;
59        };
60        let _permit = permit;
61
62        while let Some(item) = rx.next().await {
63            let Ok(message) = jsonrpsee::server::SubscriptionMessage::from_json(&item) else {
64                break;
65            };
66            let Ok(()) = sink.send(message).await else {
67                break;
68            };
69        }
70
71        //         match sink.pipe_from_stream(rx).await {
72        //             SubscriptionClosed::Success => {
73        //                 debug!("Subscription completed.");
74        //                 sink.close(SubscriptionClosed::Success);
75        //             }
76        //             SubscriptionClosed::RemotePeerAborted => {
77        //                 debug!("Subscription aborted by remote peer.");
78        //                 sink.close(SubscriptionClosed::RemotePeerAborted);
79        //             }
80        //             SubscriptionClosed::Failed(err) => {
81        //                 debug!("Subscription failed: {err:?}");
82        //                 sink.close(err);
83        //             }
84        //         };
85    });
86}
87const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
88
89pub struct IndexerApi<R> {
90    state: Arc<dyn StateRead>,
91    read_api: R,
92    transaction_kv_store: Arc<TransactionKeyValueStore>,
93    name_service_config: NameServiceConfig,
94    pub metrics: Arc<JsonRpcMetrics>,
95    subscription_semaphore: Arc<Semaphore>,
96}
97
98impl<R: ReadApiServer> IndexerApi<R> {
99    pub fn new(
100        state: Arc<AuthorityState>,
101        read_api: R,
102        transaction_kv_store: Arc<TransactionKeyValueStore>,
103        name_service_config: NameServiceConfig,
104        metrics: Arc<JsonRpcMetrics>,
105        max_subscriptions: Option<usize>,
106    ) -> Self {
107        let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
108        Self {
109            state,
110            transaction_kv_store,
111            read_api,
112            name_service_config,
113            metrics,
114            subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
115        }
116    }
117
118    fn extract_values_from_dynamic_field_name(
119        &self,
120        name: DynamicFieldName,
121    ) -> Result<(TypeTag, Vec<u8>), SuiRpcInputError> {
122        let DynamicFieldName {
123            type_: name_type,
124            value,
125        } = name;
126        let epoch_store = self.state.load_epoch_store_one_call_per_task();
127        let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
128        let sui_json_value = SuiJsonValue::new(value)?;
129        let name_bcs_value = sui_json_value.to_bcs_bytes(&layout)?;
130        Ok((name_type, name_bcs_value))
131    }
132
133    fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
134        match self.subscription_semaphore.clone().try_acquire_owned() {
135            Ok(p) => Ok(p),
136            Err(_) => bail!("Resources exhausted"),
137        }
138    }
139
140    fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
141        let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
142
143        let checkpoint = self
144            .state
145            .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
146
147        Ok(checkpoint.timestamp_ms)
148    }
149}
150
151#[async_trait]
152impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
153    #[instrument(skip(self))]
154    async fn get_owned_objects(
155        &self,
156        address: SuiAddress,
157        query: Option<SuiObjectResponseQuery>,
158        cursor: Option<ObjectID>,
159        limit: Option<usize>,
160    ) -> RpcResult<ObjectsPage> {
161        with_tracing!(async move {
162            let limit =
163                validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(SuiRpcInputError::from)?;
164            self.metrics.get_owned_objects_limit.observe(limit as f64);
165            let SuiObjectResponseQuery { filter, options } = query.unwrap_or_default();
166            let options = options.unwrap_or_default();
167            let mut objects = self
168                .state
169                .get_owner_objects_with_limit(address, cursor, limit + 1, filter)
170                .map_err(Error::from)?;
171
172            // objects here are of size (limit + 1), where the last one is the cursor for the next page
173            let has_next_page = objects.len() > limit;
174            objects.truncate(limit);
175            let next_cursor = objects
176                .last()
177                .cloned()
178                .map_or(cursor, |o_info| Some(o_info.object_id));
179
180            let data = match options.is_not_in_object_info() {
181                true => {
182                    let object_ids = objects.iter().map(|obj| obj.object_id).collect();
183                    self.read_api
184                        .multi_get_objects(object_ids, Some(options))
185                        .await?
186                }
187                false => objects
188                    .into_iter()
189                    .map(|o_info| SuiObjectResponse::try_from((o_info, options.clone())))
190                    .collect::<Result<Vec<SuiObjectResponse>, _>>()?,
191            };
192
193            self.metrics
194                .get_owned_objects_result_size
195                .observe(data.len() as f64);
196            self.metrics
197                .get_owned_objects_result_size_total
198                .inc_by(data.len() as u64);
199            Ok(Page {
200                data,
201                next_cursor,
202                has_next_page,
203            })
204        })
205    }
206
207    #[instrument(skip(self))]
208    async fn query_transaction_blocks(
209        &self,
210        query: SuiTransactionBlockResponseQuery,
211        // If `Some`, the query will start from the next item after the specified cursor
212        cursor: Option<TransactionDigest>,
213        limit: Option<usize>,
214        descending_order: Option<bool>,
215    ) -> RpcResult<TransactionBlocksPage> {
216        with_tracing!(async move {
217            let limit = cap_page_limit(limit);
218            self.metrics.query_tx_blocks_limit.observe(limit as f64);
219            let descending = descending_order.unwrap_or_default();
220            let opts = query.options.unwrap_or_default();
221
222            // Retrieve 1 extra item for next cursor
223            let mut digests = self
224                .state
225                .get_transactions(
226                    &self.transaction_kv_store,
227                    query.filter,
228                    cursor,
229                    Some(limit + 1),
230                    descending,
231                )
232                .await
233                .map_err(Error::from)?;
234            // De-dup digests, duplicate digests are possible, for example,
235            // when get_transactions_by_move_function with module or function being None.
236            let mut seen = HashSet::new();
237            digests.retain(|digest| seen.insert(*digest));
238
239            // extract next cursor
240            let has_next_page = digests.len() > limit;
241            digests.truncate(limit);
242            let next_cursor = digests.last().cloned().map_or(cursor, Some);
243
244            let data: Vec<SuiTransactionBlockResponse> = if opts.only_digest() {
245                digests
246                    .into_iter()
247                    .map(SuiTransactionBlockResponse::new)
248                    .collect()
249            } else {
250                self.read_api
251                    .multi_get_transaction_blocks(digests, Some(opts))
252                    .await?
253            };
254
255            self.metrics
256                .query_tx_blocks_result_size
257                .observe(data.len() as f64);
258            self.metrics
259                .query_tx_blocks_result_size_total
260                .inc_by(data.len() as u64);
261            Ok(Page {
262                data,
263                next_cursor,
264                has_next_page,
265            })
266        })
267    }
268    #[instrument(skip(self))]
269    async fn query_events(
270        &self,
271        query: EventFilter,
272        // exclusive cursor if `Some`, otherwise start from the beginning
273        cursor: Option<EventID>,
274        limit: Option<usize>,
275        descending_order: Option<bool>,
276    ) -> RpcResult<EventPage> {
277        with_tracing!(async move {
278            let descending = descending_order.unwrap_or_default();
279            let limit = cap_page_limit(limit);
280            self.metrics.query_events_limit.observe(limit as f64);
281            // Retrieve 1 extra item for next cursor
282            let mut data = self
283                .state
284                .query_events(
285                    &self.transaction_kv_store,
286                    query,
287                    cursor,
288                    limit + 1,
289                    descending,
290                )
291                .await
292                .map_err(Error::from)?;
293            let has_next_page = data.len() > limit;
294            data.truncate(limit);
295            let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
296            self.metrics
297                .query_events_result_size
298                .observe(data.len() as f64);
299            self.metrics
300                .query_events_result_size_total
301                .inc_by(data.len() as u64);
302            Ok(EventPage {
303                data,
304                next_cursor,
305                has_next_page,
306            })
307        })
308    }
309
310    #[instrument(skip(self))]
311    fn subscribe_event(
312        &self,
313        sink: PendingSubscriptionSink,
314        filter: EventFilter,
315    ) -> SubscriptionResult {
316        let permit = self.acquire_subscribe_permit()?;
317        spawn_subscription(
318            sink,
319            self.state
320                .get_subscription_handler()
321                .subscribe_events(filter),
322            Some(permit),
323        );
324        Ok(())
325    }
326
327    fn subscribe_transaction(
328        &self,
329        sink: PendingSubscriptionSink,
330        filter: TransactionFilter,
331    ) -> SubscriptionResult {
332        let permit = self.acquire_subscribe_permit()?;
333        spawn_subscription(
334            sink,
335            self.state
336                .get_subscription_handler()
337                .subscribe_transactions(filter),
338            Some(permit),
339        );
340        Ok(())
341    }
342
343    #[instrument(skip(self))]
344    async fn get_dynamic_fields(
345        &self,
346        parent_object_id: ObjectID,
347        // If `Some`, the query will start from the next item after the specified cursor
348        cursor: Option<ObjectID>,
349        limit: Option<usize>,
350    ) -> RpcResult<DynamicFieldPage> {
351        with_tracing!(async move {
352            let limit = cap_page_limit(limit);
353            self.metrics.get_dynamic_fields_limit.observe(limit as f64);
354            let mut data = self
355                .state
356                .get_dynamic_fields(parent_object_id, cursor, limit + 1)
357                .map_err(Error::from)?;
358            let has_next_page = data.len() > limit;
359            data.truncate(limit);
360            let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
361            self.metrics
362                .get_dynamic_fields_result_size
363                .observe(data.len() as f64);
364            self.metrics
365                .get_dynamic_fields_result_size_total
366                .inc_by(data.len() as u64);
367            Ok(DynamicFieldPage {
368                data: data.into_iter().map(|(_, w)| w.into()).collect(),
369                next_cursor,
370                has_next_page,
371            })
372        })
373    }
374
375    #[instrument(skip(self))]
376    async fn get_dynamic_field_object(
377        &self,
378        parent_object_id: ObjectID,
379        name: DynamicFieldName,
380    ) -> RpcResult<SuiObjectResponse> {
381        with_tracing!(async move {
382            let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
383
384            let id = self
385                .state
386                .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
387                .map_err(Error::from)?;
388            // TODO(chris): add options to `get_dynamic_field_object` API as well
389            if let Some(id) = id {
390                self.read_api
391                    .get_object(id, Some(SuiObjectDataOptions::full_content()))
392                    .await
393                    .map_err(Error::from)
394            } else {
395                Ok(SuiObjectResponse::new_with_error(
396                    SuiObjectResponseError::DynamicFieldNotFound { parent_object_id },
397                ))
398            }
399        })
400    }
401
402    #[instrument(skip(self))]
403    async fn resolve_name_service_address(&self, name: String) -> RpcResult<Option<SuiAddress>> {
404        with_tracing!(async move {
405            // prepare the requested domain's field id.
406            let domain = name.parse::<Domain>().map_err(Error::from)?;
407            let record_id = self.name_service_config.record_field_id(&domain);
408
409            // prepare the parent's field id.
410            let parent_domain = domain.parent();
411            let parent_record_id = self.name_service_config.record_field_id(&parent_domain);
412
413            let current_timestamp_ms = self.get_latest_checkpoint_timestamp_ms()?;
414
415            // Do these two reads in parallel.
416            let mut requests = vec![self.state.get_object(&record_id)];
417
418            // Also add the parent in the DB reads if the requested domain is a subdomain.
419            if domain.is_subdomain() {
420                requests.push(self.state.get_object(&parent_record_id));
421            }
422
423            // Couldn't find a `multi_get_object` for this crate (looks like it uses a k,v db)
424            // Always fetching both parent + child at the same time (even for node subdomains),
425            // to avoid sequential db reads. We do this because we do not know if the requested
426            // domain is a node subdomain or a leaf subdomain, and we can save a trip to the db.
427            let mut results = future::try_join_all(requests).await?;
428
429            // Removing without checking vector len, since it is known (== 1 or 2 depending on whether
430            // it is a subdomain or not).
431            let Some(object) = results.remove(0) else {
432                return Ok(None);
433            };
434
435            let name_record = NameRecord::try_from(object)?;
436
437            // Handling SLD names & node subdomains is the same (we handle them as `node` records)
438            // We check their expiration, and if not expired, return the target address.
439            if !name_record.is_leaf_record() {
440                return if !name_record.is_node_expired(current_timestamp_ms) {
441                    Ok(name_record.target_address)
442                } else {
443                    Err(Error::from(NameServiceError::NameExpired))
444                };
445            }
446
447            // == Handle leaf subdomains case ==
448            // We can remove since we know that if we're here, we have a parent
449            // (which also means we queried it in the future above).
450            let Some(parent_object) = results.remove(0) else {
451                return Err(Error::from(NameServiceError::NameExpired));
452            };
453
454            let parent_name_record = NameRecord::try_from(parent_object)?;
455
456            // For a leaf record, we check that:
457            // 1. The parent is a valid parent for that leaf record
458            // 2. The parent is not expired
459            if parent_name_record.is_valid_leaf_parent(&name_record)
460                && !parent_name_record.is_node_expired(current_timestamp_ms)
461            {
462                Ok(name_record.target_address)
463            } else {
464                Err(Error::from(NameServiceError::NameExpired))
465            }
466        })
467    }
468
469    #[instrument(skip(self))]
470    async fn resolve_name_service_names(
471        &self,
472        address: SuiAddress,
473        _cursor: Option<ObjectID>,
474        _limit: Option<usize>,
475    ) -> RpcResult<Page<String, ObjectID>> {
476        with_tracing!(async move {
477            let reverse_record_id = self
478                .name_service_config
479                .reverse_record_field_id(address.as_ref());
480
481            let mut result = Page {
482                data: vec![],
483                next_cursor: None,
484                has_next_page: false,
485            };
486
487            let Some(field_reverse_record_object) =
488                self.state.get_object(&reverse_record_id).await?
489            else {
490                return Ok(result);
491            };
492
493            let domain = field_reverse_record_object
494                .to_rust::<Field<SuiAddress, Domain>>()
495                .ok_or_else(|| {
496                    Error::UnexpectedError(format!("Malformed Object {reverse_record_id}"))
497                })?
498                .value;
499
500            let domain_name = domain.to_string();
501
502            let resolved_address = self
503                .resolve_name_service_address(domain_name.clone())
504                .await?;
505
506            // If looking up the domain returns an empty result, we return an empty result.
507            if resolved_address.is_none() {
508                return Ok(result);
509            }
510
511            // TODO(manos): Discuss why is this even a paginated response.
512            // This API is always going to return a single domain name.
513            result.data.push(domain_name);
514
515            Ok(result)
516        })
517    }
518}
519
520impl<R: ReadApiServer> SuiRpcModule for IndexerApi<R> {
521    fn rpc(self) -> RpcModule<Self> {
522        self.into_rpc()
523    }
524
525    fn rpc_doc_module() -> Module {
526        IndexerApiOpenRpc::module_doc()
527    }
528}