sui_indexer_alt_framework/ingestion/
rpc_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::str::FromStr;
5
6use anyhow::Context as _;
7use anyhow::anyhow;
8use async_trait::async_trait;
9use prost_types::FieldMask;
10use sui_rpc::Client as RpcClient;
11use sui_rpc::field::FieldMaskUtil;
12use sui_rpc::proto::sui::rpc::v2::GetCheckpointRequest;
13use sui_rpc::proto::sui::rpc::v2::GetServiceInfoRequest;
14use sui_rpc::proto::sui::rpc::v2::GetServiceInfoResponse;
15use sui_types::digests::ChainIdentifier;
16use sui_types::digests::CheckpointDigest;
17use sui_types::full_checkpoint_content::Checkpoint;
18use tonic::Code;
19
20use crate::ingestion::decode::Error::ProtoConversion;
21use crate::ingestion::ingestion_client::CheckpointError;
22use crate::ingestion::ingestion_client::CheckpointResult;
23use crate::ingestion::ingestion_client::IngestionClientTrait;
24
25#[async_trait]
26impl IngestionClientTrait for RpcClient {
27    async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
28        let response = get_service_info_request(self).await?;
29        Ok(CheckpointDigest::from_str(response.chain_id())?.into())
30    }
31
32    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
33        let request: GetCheckpointRequest = GetCheckpointRequest::by_sequence_number(checkpoint)
34            .with_read_mask(FieldMask::from_paths([
35                "summary.bcs",
36                "signature",
37                "contents.bcs",
38                "transactions.transaction.bcs",
39                "transactions.effects.bcs",
40                "transactions.effects.unchanged_loaded_runtime_objects",
41                "transactions.events.bcs",
42                "objects.objects.bcs",
43            ]));
44
45        let response = self
46            .clone()
47            .ledger_client()
48            .get_checkpoint(request)
49            .await
50            .map_err(|status| match status.code() {
51                Code::NotFound => CheckpointError::NotFound,
52                _ => CheckpointError::Fetch(anyhow!(status)),
53            })?;
54
55        // `total_ingested_bytes` is incremented directly by the
56        // `ByteCountMakeCallbackHandler` request layer attached in
57        // `IngestionClient::with_grpc`, so it does not need to be tracked here.
58        let response = response.into_inner();
59        // Proto -> Checkpoint conversion is multi-ms of CPU work; offload to the
60        // blocking pool so it doesn't stall the reactor.
61        tokio::task::spawn_blocking(move || {
62            Checkpoint::try_from(response.checkpoint())
63                .map_err(|e| CheckpointError::Decode(ProtoConversion(e)))
64        })
65        .await
66        .map_err(|e| CheckpointError::Fetch(anyhow!("decode task panicked: {e}")))?
67    }
68
69    async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
70        get_service_info_request(self)
71            .await?
72            .checkpoint_height
73            .context("Checkpoint height not found")
74    }
75}
76
77async fn get_service_info_request(
78    rpc_client: &RpcClient,
79) -> anyhow::Result<GetServiceInfoResponse> {
80    let request = GetServiceInfoRequest::const_default();
81    Ok(rpc_client
82        .clone()
83        .ledger_client()
84        .get_service_info(request)
85        .await?
86        .into_inner())
87}