sui_indexer_alt_framework/ingestion/
rpc_client.rs1use std::str::FromStr;
5
6use anyhow::Context as _;
7use anyhow::anyhow;
8use async_trait::async_trait;
9use prost_types::FieldMask;
10use sui_rpc::Client as RpcClient;
11use sui_rpc::field::FieldMaskUtil;
12use sui_rpc::proto::sui::rpc::v2::GetCheckpointRequest;
13use sui_rpc::proto::sui::rpc::v2::GetServiceInfoRequest;
14use sui_rpc::proto::sui::rpc::v2::GetServiceInfoResponse;
15use sui_types::digests::ChainIdentifier;
16use sui_types::digests::CheckpointDigest;
17use sui_types::full_checkpoint_content::Checkpoint;
18use tonic::Code;
19
20use crate::ingestion::decode::Error::ProtoConversion;
21use crate::ingestion::ingestion_client::CheckpointError;
22use crate::ingestion::ingestion_client::CheckpointResult;
23use crate::ingestion::ingestion_client::IngestionClientTrait;
24
25#[async_trait]
26impl IngestionClientTrait for RpcClient {
27 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
28 let response = get_service_info_request(self).await?;
29 Ok(CheckpointDigest::from_str(response.chain_id())?.into())
30 }
31
32 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
33 let request: GetCheckpointRequest = GetCheckpointRequest::by_sequence_number(checkpoint)
34 .with_read_mask(FieldMask::from_paths([
35 "summary.bcs",
36 "signature",
37 "contents.bcs",
38 "transactions.transaction.bcs",
39 "transactions.effects.bcs",
40 "transactions.effects.unchanged_loaded_runtime_objects",
41 "transactions.events.bcs",
42 "objects.objects.bcs",
43 ]));
44
45 let response = self
46 .clone()
47 .ledger_client()
48 .get_checkpoint(request)
49 .await
50 .map_err(|status| match status.code() {
51 Code::NotFound => CheckpointError::NotFound,
52 _ => CheckpointError::Fetch(anyhow!(status)),
53 })?;
54
55 let response = response.into_inner();
59 tokio::task::spawn_blocking(move || {
62 Checkpoint::try_from(response.checkpoint())
63 .map_err(|e| CheckpointError::Decode(ProtoConversion(e)))
64 })
65 .await
66 .map_err(|e| CheckpointError::Fetch(anyhow!("decode task panicked: {e}")))?
67 }
68
69 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
70 get_service_info_request(self)
71 .await?
72 .checkpoint_height
73 .context("Checkpoint height not found")
74 }
75}
76
77async fn get_service_info_request(
78 rpc_client: &RpcClient,
79) -> anyhow::Result<GetServiceInfoResponse> {
80 let request = GetServiceInfoRequest::const_default();
81 Ok(rpc_client
82 .clone()
83 .ledger_client()
84 .get_service_info(request)
85 .await?
86 .into_inner())
87}