sui_rosetta/
state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use chrono::DateTime;
6use futures::stream::{self, StreamExt, TryStreamExt};
7use prost_types::FieldMask;
8use std::str::FromStr;
9use std::sync::Arc;
10use sui_rpc::client::Client as GrpcClient;
11use sui_rpc::field::FieldMaskUtil;
12use sui_rpc::proto::sui::rpc::v2::{Checkpoint, GetCheckpointRequest, get_checkpoint_request};
13use sui_types::base_types::TransactionDigest;
14use sui_types::digests::CheckpointDigest;
15use sui_types::messages_checkpoint::CheckpointSequenceNumber;
16
17use crate::operations::Operations;
18use crate::types::{
19    Block, BlockHash, BlockIdentifier, BlockResponse, Transaction, TransactionIdentifier,
20};
21use crate::{CoinMetadataCache, Error};
22
23#[derive(Clone)]
24pub struct OnlineServerContext {
25    pub client: GrpcClient,
26    pub coin_metadata_cache: CoinMetadataCache,
27    block_provider: Arc<dyn BlockProvider + Send + Sync>,
28}
29
30impl OnlineServerContext {
31    pub fn new(
32        client: GrpcClient,
33        block_provider: Arc<dyn BlockProvider + Send + Sync>,
34        coin_metadata_cache: CoinMetadataCache,
35    ) -> Self {
36        Self {
37            client,
38            block_provider,
39            coin_metadata_cache,
40        }
41    }
42
43    pub fn blocks(&self) -> &(dyn BlockProvider + Sync + Send) {
44        &*self.block_provider
45    }
46}
47
48#[async_trait]
49pub trait BlockProvider {
50    async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error>;
51    async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error>;
52    async fn current_block(&self) -> Result<BlockResponse, Error>;
53    async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error>;
54    async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
55    async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error>;
56    async fn create_block_identifier(
57        &self,
58        checkpoint: CheckpointSequenceNumber,
59    ) -> Result<BlockIdentifier, Error>;
60}
61
62#[derive(Clone)]
63pub struct CheckpointBlockProvider {
64    client: GrpcClient,
65    coin_metadata_cache: CoinMetadataCache,
66}
67
68#[async_trait]
69impl BlockProvider for CheckpointBlockProvider {
70    async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error> {
71        let request = GetCheckpointRequest::by_sequence_number(index).with_read_mask(
72            FieldMask::from_paths([
73                "sequence_number",
74                "digest",
75                "summary.sequence_number",
76                "summary.previous_digest",
77                "summary.timestamp",
78                "transactions.digest",
79                "transactions.transaction.sender",
80                "transactions.transaction.gas_payment",
81                "transactions.transaction.kind",
82                "transactions.effects.gas_object",
83                "transactions.effects.gas_used",
84                "transactions.effects.status",
85                "transactions.balance_changes",
86                "transactions.events.events.event_type",
87                "transactions.events.events.json",
88            ]),
89        );
90
91        let mut client = self.client.clone();
92        let response = client
93            .ledger_client()
94            .get_checkpoint(request)
95            .await
96            .map_err(|e| Error::from(anyhow::anyhow!("Failed to get checkpoint: {}", e)))?
97            .into_inner();
98
99        let checkpoint = response
100            .checkpoint
101            .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
102
103        self.create_block_response(checkpoint).await
104    }
105
106    async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error> {
107        let mut request = GetCheckpointRequest::default().with_read_mask(FieldMask::from_paths([
108            "sequence_number",
109            "digest",
110            "summary.sequence_number",
111            "summary.previous_digest",
112            "summary.timestamp",
113            "transactions.digest",
114            "transactions.transaction.sender",
115            "transactions.transaction.gas_payment",
116            "transactions.transaction.kind",
117            "transactions.effects.gas_object",
118            "transactions.effects.gas_used",
119            "transactions.effects.status",
120            "transactions.balance_changes",
121            "transactions.events.events.event_type",
122            "transactions.events.events.json",
123        ]));
124        request.checkpoint_id = Some(get_checkpoint_request::CheckpointId::Digest(
125            hash.to_string(),
126        ));
127
128        let mut client = self.client.clone();
129        let response = client
130            .ledger_client()
131            .get_checkpoint(request)
132            .await?
133            .into_inner();
134        let checkpoint = response
135            .checkpoint
136            .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
137
138        self.create_block_response(checkpoint).await
139    }
140
141    async fn current_block(&self) -> Result<BlockResponse, Error> {
142        let request = GetCheckpointRequest::latest()
143            .with_read_mask(FieldMask::from_paths(["sequence_number"]));
144
145        let mut client = self.client.clone();
146        let response = client
147            .ledger_client()
148            .get_checkpoint(request)
149            .await?
150            .into_inner();
151
152        let sequence_number = response.checkpoint().sequence_number();
153        self.get_block_by_index(sequence_number).await
154    }
155
156    async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error> {
157        self.create_block_identifier(0).await
158    }
159
160    async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
161        self.create_block_identifier(0).await
162    }
163
164    async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error> {
165        let request = GetCheckpointRequest::latest()
166            .with_read_mask(FieldMask::from_paths(["sequence_number"]));
167
168        let response = self
169            .client
170            .clone()
171            .ledger_client()
172            .get_checkpoint(request)
173            .await?
174            .into_inner();
175
176        let checkpoint = response
177            .checkpoint
178            .ok_or_else(|| Error::DataError("Missing checkpoint".to_string()))?;
179
180        let sequence_number = checkpoint.sequence_number();
181
182        self.create_block_identifier(sequence_number).await
183    }
184
185    async fn create_block_identifier(
186        &self,
187        checkpoint: CheckpointSequenceNumber,
188    ) -> Result<BlockIdentifier, Error> {
189        self.create_block_identifier(checkpoint).await
190    }
191}
192
193impl CheckpointBlockProvider {
194    pub fn new(client: GrpcClient, coin_metadata_cache: CoinMetadataCache) -> Self {
195        Self {
196            client,
197            coin_metadata_cache,
198        }
199    }
200
201    async fn create_block_response(&self, checkpoint: Checkpoint) -> Result<BlockResponse, Error> {
202        let summary = checkpoint.summary();
203        let index = summary.sequence_number();
204        let hash = CheckpointDigest::from_str(checkpoint.digest())?;
205        // Genesis checkpoint (index 0) has no previous digest
206        let previous_hash = if index == 0 {
207            hash
208        } else {
209            CheckpointDigest::from_str(summary.previous_digest())?
210        };
211        let timestamp_ms = summary
212            .timestamp
213            .ok_or_else(|| Error::DataError("Checkpoint timestamp is missing".to_string()))
214            .and_then(|ts| {
215                DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
216                    .ok_or_else(|| Error::DataError(format!("Invalid timestamp: {}", ts)))
217            })?
218            .timestamp_millis() as u64;
219
220        let transactions: Vec<Transaction> = stream::iter(checkpoint.transactions)
221            .map(|executed_tx| async move {
222                let digest = TransactionDigest::from_str(executed_tx.digest())?;
223                Ok::<_, Error>(Transaction {
224                    transaction_identifier: TransactionIdentifier { hash: digest },
225                    // This is async because it makes a GetCoinMetadata call if the coin metadata
226                    // isn't already cached.
227                    operations: Operations::try_from_executed_transaction(
228                        executed_tx,
229                        &self.coin_metadata_cache,
230                    )
231                    .await?,
232                    related_transactions: vec![],
233                    metadata: None,
234                })
235            })
236            // A checkpoint can have thousands of transactions so
237            // limit the amount of work a single rosetta request can generate
238            // concurrently to prevent resource starvation issues.
239            .buffer_unordered(10)
240            .try_collect()
241            .await?;
242
243        let parent_block_identifier = if index == 0 {
244            // Genesis block is its own parent
245            BlockIdentifier { index, hash }
246        } else {
247            BlockIdentifier {
248                index: index - 1,
249                hash: previous_hash,
250            }
251        };
252
253        Ok(BlockResponse {
254            block: Block {
255                block_identifier: BlockIdentifier { index, hash },
256                parent_block_identifier,
257                timestamp: timestamp_ms,
258                transactions,
259                metadata: None,
260            },
261            other_transactions: vec![],
262        })
263    }
264
265    async fn create_block_identifier(
266        &self,
267        seq_number: CheckpointSequenceNumber,
268    ) -> Result<BlockIdentifier, Error> {
269        let grpc_request = GetCheckpointRequest::by_sequence_number(seq_number)
270            .with_read_mask(FieldMask::from_paths(["sequence_number", "digest"]));
271        let mut client = self.client.clone();
272        let response = client
273            .ledger_client()
274            .get_checkpoint(grpc_request)
275            .await?
276            .into_inner();
277
278        let checkpoint = response.checkpoint();
279        let index = checkpoint.sequence_number();
280        let hash = checkpoint.digest();
281
282        Ok(BlockIdentifier {
283            index,
284            hash: CheckpointDigest::from_str(hash)?,
285        })
286    }
287}