sui_rosetta/
state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use futures::future::try_join_all;
6use std::sync::Arc;
7use sui_json_rpc_types::SuiTransactionBlockResponseOptions;
8use sui_sdk::SuiClient;
9use sui_sdk::rpc_types::Checkpoint;
10use sui_types::messages_checkpoint::CheckpointSequenceNumber;
11
12use crate::operations::Operations;
13use crate::types::{
14    Block, BlockHash, BlockIdentifier, BlockResponse, Transaction, TransactionIdentifier,
15};
16use crate::{CoinMetadataCache, Error};
17
18#[cfg(test)]
19#[path = "unit_tests/balance_changing_tx_tests.rs"]
20mod balance_changing_tx_tests;
21
22#[derive(Clone)]
23pub struct OnlineServerContext {
24    pub client: SuiClient,
25    pub coin_metadata_cache: CoinMetadataCache,
26    block_provider: Arc<dyn BlockProvider + Send + Sync>,
27}
28
29impl OnlineServerContext {
30    pub fn new(
31        client: SuiClient,
32        block_provider: Arc<dyn BlockProvider + Send + Sync>,
33        coin_metadata_cache: CoinMetadataCache,
34    ) -> Self {
35        Self {
36            client: client.clone(),
37            block_provider,
38            coin_metadata_cache,
39        }
40    }
41
42    pub fn blocks(&self) -> &(dyn BlockProvider + Sync + Send) {
43        &*self.block_provider
44    }
45}
46
47#[async_trait]
48pub trait BlockProvider {
49    async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error>;
50    async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error>;
51    async fn current_block(&self) -> Result<BlockResponse, Error>;
52    async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error>;
53    async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
54    async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error>;
55    async fn create_block_identifier(
56        &self,
57        checkpoint: CheckpointSequenceNumber,
58    ) -> Result<BlockIdentifier, Error>;
59}
60
61#[derive(Clone)]
62pub struct CheckpointBlockProvider {
63    client: SuiClient,
64    coin_metadata_cache: CoinMetadataCache,
65}
66
67#[async_trait]
68impl BlockProvider for CheckpointBlockProvider {
69    async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error> {
70        let checkpoint = self.client.read_api().get_checkpoint(index.into()).await?;
71        self.create_block_response(checkpoint).await
72    }
73
74    async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error> {
75        let checkpoint = self.client.read_api().get_checkpoint(hash.into()).await?;
76        self.create_block_response(checkpoint).await
77    }
78
79    async fn current_block(&self) -> Result<BlockResponse, Error> {
80        let checkpoint = self
81            .client
82            .read_api()
83            .get_latest_checkpoint_sequence_number()
84            .await?;
85        self.get_block_by_index(checkpoint).await
86    }
87
88    async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error> {
89        self.create_block_identifier(0).await
90    }
91
92    async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
93        self.create_block_identifier(0).await
94    }
95
96    async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error> {
97        let checkpoint = self
98            .client
99            .read_api()
100            .get_latest_checkpoint_sequence_number()
101            .await?;
102
103        self.create_block_identifier(checkpoint).await
104    }
105
106    async fn create_block_identifier(
107        &self,
108        checkpoint: CheckpointSequenceNumber,
109    ) -> Result<BlockIdentifier, Error> {
110        self.create_block_identifier(checkpoint).await
111    }
112}
113
114impl CheckpointBlockProvider {
115    pub fn new(client: SuiClient, coin_metadata_cache: CoinMetadataCache) -> Self {
116        Self {
117            client,
118            coin_metadata_cache,
119        }
120    }
121
122    async fn create_block_response(&self, checkpoint: Checkpoint) -> Result<BlockResponse, Error> {
123        let index = checkpoint.sequence_number;
124        let hash = checkpoint.digest;
125
126        let chunks = checkpoint
127            .transactions
128            .chunks(5)
129            .map(|batch| async {
130                let transaction_responses = self
131                    .client
132                    .read_api()
133                    .multi_get_transactions_with_options(
134                        batch.to_vec(),
135                        SuiTransactionBlockResponseOptions::new()
136                            .with_input()
137                            .with_effects()
138                            .with_balance_changes()
139                            .with_events(),
140                    )
141                    .await?;
142
143                let mut transactions = vec![];
144                for tx in transaction_responses.into_iter() {
145                    transactions.push(Transaction {
146                        transaction_identifier: TransactionIdentifier { hash: tx.digest },
147                        operations: Operations::try_from_response(tx, &self.coin_metadata_cache)
148                            .await?,
149                        related_transactions: vec![],
150                        metadata: None,
151                    })
152                }
153                Ok::<Vec<_>, anyhow::Error>(transactions)
154            })
155            .collect::<Vec<_>>();
156
157        let transactions = try_join_all(chunks)
158            .await?
159            .into_iter()
160            .flatten()
161            .collect::<Vec<_>>();
162
163        // previous digest should only be None for genesis block.
164        if checkpoint.previous_digest.is_none() && index != 0 {
165            return Err(Error::DataError(format!(
166                "Previous digest is None for checkpoint [{index}], digest: [{hash:?}]"
167            )));
168        }
169
170        let parent_block_identifier = checkpoint
171            .previous_digest
172            .map(|hash| BlockIdentifier {
173                index: index - 1,
174                hash,
175            })
176            .unwrap_or_else(|| BlockIdentifier { index, hash });
177
178        Ok(BlockResponse {
179            block: Block {
180                block_identifier: BlockIdentifier { index, hash },
181                parent_block_identifier,
182                timestamp: checkpoint.timestamp_ms,
183                transactions,
184                metadata: None,
185            },
186            other_transactions: vec![],
187        })
188    }
189
190    async fn create_block_identifier(
191        &self,
192        seq_number: CheckpointSequenceNumber,
193    ) -> Result<BlockIdentifier, Error> {
194        let checkpoint = self
195            .client
196            .read_api()
197            .get_checkpoint(seq_number.into())
198            .await?;
199        Ok(BlockIdentifier {
200            index: checkpoint.sequence_number,
201            hash: checkpoint.digest,
202        })
203    }
204}