sui_rosetta/
state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use chrono::DateTime;
6use futures::stream::{self, StreamExt, TryStreamExt};
7use prost_types::FieldMask;
8use std::str::FromStr;
9use std::sync::Arc;
10use sui_rpc::client::Client as GrpcClient;
11use sui_rpc::field::FieldMaskUtil;
12use sui_rpc::proto::sui::rpc::v2::{
13    Checkpoint, GetCheckpointRequest, GetServiceInfoRequest, get_checkpoint_request,
14};
15use sui_types::base_types::TransactionDigest;
16use sui_types::digests::CheckpointDigest;
17use sui_types::messages_checkpoint::CheckpointSequenceNumber;
18
19use sui_types::digests::ChainIdentifier;
20
21use crate::operations::Operations;
22use crate::types::{
23    Block, BlockHash, BlockIdentifier, BlockResponse, Transaction, TransactionIdentifier,
24};
25use crate::{CoinMetadataCache, Error};
26
27#[derive(Clone)]
28pub struct OnlineServerContext {
29    pub client: GrpcClient,
30    pub coin_metadata_cache: CoinMetadataCache,
31    pub chain_id: ChainIdentifier,
32    block_provider: Arc<dyn BlockProvider + Send + Sync>,
33}
34
35impl OnlineServerContext {
36    pub fn new(
37        client: GrpcClient,
38        block_provider: Arc<dyn BlockProvider + Send + Sync>,
39        coin_metadata_cache: CoinMetadataCache,
40        chain_id: ChainIdentifier,
41    ) -> Self {
42        Self {
43            client,
44            block_provider,
45            coin_metadata_cache,
46            chain_id,
47        }
48    }
49
50    pub fn blocks(&self) -> &(dyn BlockProvider + Sync + Send) {
51        &*self.block_provider
52    }
53}
54
55#[async_trait]
56pub trait BlockProvider {
57    async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error>;
58    async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error>;
59    async fn current_block(&self) -> Result<BlockResponse, Error>;
60    async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error>;
61    async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
62    async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error>;
63    async fn create_block_identifier(
64        &self,
65        checkpoint: CheckpointSequenceNumber,
66    ) -> Result<BlockIdentifier, Error>;
67}
68
69#[derive(Clone)]
70pub struct CheckpointBlockProvider {
71    client: GrpcClient,
72    coin_metadata_cache: CoinMetadataCache,
73}
74
75#[async_trait]
76impl BlockProvider for CheckpointBlockProvider {
77    async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error> {
78        let request = GetCheckpointRequest::by_sequence_number(index).with_read_mask(
79            FieldMask::from_paths([
80                "sequence_number",
81                "digest",
82                "summary.sequence_number",
83                "summary.previous_digest",
84                "summary.timestamp",
85                "transactions.digest",
86                "transactions.transaction.sender",
87                "transactions.transaction.gas_payment",
88                "transactions.transaction.kind",
89                "transactions.effects.gas_object",
90                "transactions.effects.gas_used",
91                "transactions.effects.status",
92                "transactions.balance_changes",
93                "transactions.events.events.event_type",
94                "transactions.events.events.json",
95            ]),
96        );
97
98        let mut client = self.client.clone();
99        let response = client
100            .ledger_client()
101            .get_checkpoint(request)
102            .await
103            .map_err(|e| Error::from(anyhow::anyhow!("Failed to get checkpoint: {}", e)))?
104            .into_inner();
105
106        let checkpoint = response
107            .checkpoint
108            .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
109
110        self.create_block_response(checkpoint).await
111    }
112
113    async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error> {
114        let mut request = GetCheckpointRequest::default().with_read_mask(FieldMask::from_paths([
115            "sequence_number",
116            "digest",
117            "summary.sequence_number",
118            "summary.previous_digest",
119            "summary.timestamp",
120            "transactions.digest",
121            "transactions.transaction.sender",
122            "transactions.transaction.gas_payment",
123            "transactions.transaction.kind",
124            "transactions.effects.gas_object",
125            "transactions.effects.gas_used",
126            "transactions.effects.status",
127            "transactions.balance_changes",
128            "transactions.events.events.event_type",
129            "transactions.events.events.json",
130        ]));
131        request.checkpoint_id = Some(get_checkpoint_request::CheckpointId::Digest(
132            hash.to_string(),
133        ));
134
135        let mut client = self.client.clone();
136        let response = client
137            .ledger_client()
138            .get_checkpoint(request)
139            .await?
140            .into_inner();
141        let checkpoint = response
142            .checkpoint
143            .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
144
145        self.create_block_response(checkpoint).await
146    }
147
148    async fn current_block(&self) -> Result<BlockResponse, Error> {
149        let request = GetCheckpointRequest::latest()
150            .with_read_mask(FieldMask::from_paths(["sequence_number"]));
151
152        let mut client = self.client.clone();
153        let response = client
154            .ledger_client()
155            .get_checkpoint(request)
156            .await?
157            .into_inner();
158
159        let sequence_number = response.checkpoint().sequence_number();
160        self.get_block_by_index(sequence_number).await
161    }
162
163    async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error> {
164        let response = self
165            .client
166            .clone()
167            .ledger_client()
168            .get_service_info(GetServiceInfoRequest::default())
169            .await?
170            .into_inner();
171        let chain_id = response
172            .chain_id
173            .ok_or_else(|| Error::DataError("Missing chain_id".to_string()))?;
174        let hash = CheckpointDigest::from_str(&chain_id)?;
175        Ok(BlockIdentifier { index: 0, hash })
176    }
177
178    async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
179        let response = self
180            .client
181            .clone()
182            .ledger_client()
183            .get_service_info(GetServiceInfoRequest::default())
184            .await?
185            .into_inner();
186        let lowest = response
187            .lowest_available_checkpoint
188            .ok_or_else(|| Error::DataError("Missing lowest_available_checkpoint".to_string()))?;
189        self.create_block_identifier(lowest).await
190    }
191
192    async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error> {
193        let request = GetCheckpointRequest::latest()
194            .with_read_mask(FieldMask::from_paths(["sequence_number"]));
195
196        let response = self
197            .client
198            .clone()
199            .ledger_client()
200            .get_checkpoint(request)
201            .await?
202            .into_inner();
203
204        let checkpoint = response
205            .checkpoint
206            .ok_or_else(|| Error::DataError("Missing checkpoint".to_string()))?;
207
208        let sequence_number = checkpoint.sequence_number();
209
210        self.create_block_identifier(sequence_number).await
211    }
212
213    async fn create_block_identifier(
214        &self,
215        checkpoint: CheckpointSequenceNumber,
216    ) -> Result<BlockIdentifier, Error> {
217        self.create_block_identifier(checkpoint).await
218    }
219}
220
221impl CheckpointBlockProvider {
222    pub fn new(client: GrpcClient, coin_metadata_cache: CoinMetadataCache) -> Self {
223        Self {
224            client,
225            coin_metadata_cache,
226        }
227    }
228
229    async fn create_block_response(&self, checkpoint: Checkpoint) -> Result<BlockResponse, Error> {
230        let summary = checkpoint.summary();
231        let index = summary.sequence_number();
232        let hash = CheckpointDigest::from_str(checkpoint.digest())?;
233        // Genesis checkpoint (index 0) has no previous digest
234        let previous_hash = if index == 0 {
235            hash
236        } else {
237            CheckpointDigest::from_str(summary.previous_digest())?
238        };
239        let timestamp_ms = summary
240            .timestamp
241            .ok_or_else(|| Error::DataError("Checkpoint timestamp is missing".to_string()))
242            .and_then(|ts| {
243                DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
244                    .ok_or_else(|| Error::DataError(format!("Invalid timestamp: {}", ts)))
245            })?
246            .timestamp_millis() as u64;
247
248        let transactions: Vec<Transaction> = stream::iter(checkpoint.transactions)
249            .map(|executed_tx| async move {
250                let digest = TransactionDigest::from_str(executed_tx.digest())?;
251                Ok::<_, Error>(Transaction {
252                    transaction_identifier: TransactionIdentifier { hash: digest },
253                    // This is async because it makes a GetCoinMetadata call if the coin metadata
254                    // isn't already cached.
255                    operations: Operations::try_from_executed_transaction(
256                        executed_tx,
257                        &self.coin_metadata_cache,
258                    )
259                    .await?,
260                    related_transactions: vec![],
261                    metadata: None,
262                })
263            })
264            // A checkpoint can have thousands of transactions so
265            // limit the amount of work a single rosetta request can generate
266            // concurrently to prevent resource starvation issues.
267            .buffer_unordered(10)
268            .try_collect()
269            .await?;
270
271        let parent_block_identifier = if index == 0 {
272            // Genesis block is its own parent
273            BlockIdentifier { index, hash }
274        } else {
275            BlockIdentifier {
276                index: index - 1,
277                hash: previous_hash,
278            }
279        };
280
281        Ok(BlockResponse {
282            block: Block {
283                block_identifier: BlockIdentifier { index, hash },
284                parent_block_identifier,
285                timestamp: timestamp_ms,
286                transactions,
287                metadata: None,
288            },
289            other_transactions: vec![],
290        })
291    }
292
293    async fn create_block_identifier(
294        &self,
295        seq_number: CheckpointSequenceNumber,
296    ) -> Result<BlockIdentifier, Error> {
297        let grpc_request = GetCheckpointRequest::by_sequence_number(seq_number)
298            .with_read_mask(FieldMask::from_paths(["sequence_number", "digest"]));
299        let mut client = self.client.clone();
300        let response = client
301            .ledger_client()
302            .get_checkpoint(grpc_request)
303            .await?
304            .into_inner();
305
306        let checkpoint = response.checkpoint();
307        let index = checkpoint.sequence_number();
308        let hash = checkpoint.digest();
309
310        Ok(BlockIdentifier {
311            index,
312            hash: CheckpointDigest::from_str(hash)?,
313        })
314    }
315}