1use async_trait::async_trait;
5use futures::future::try_join_all;
6use std::sync::Arc;
7use sui_json_rpc_types::SuiTransactionBlockResponseOptions;
8use sui_sdk::SuiClient;
9use sui_sdk::rpc_types::Checkpoint;
10use sui_types::messages_checkpoint::CheckpointSequenceNumber;
11
12use crate::operations::Operations;
13use crate::types::{
14 Block, BlockHash, BlockIdentifier, BlockResponse, Transaction, TransactionIdentifier,
15};
16use crate::{CoinMetadataCache, Error};
17
18#[cfg(test)]
19#[path = "unit_tests/balance_changing_tx_tests.rs"]
20mod balance_changing_tx_tests;
21
22#[derive(Clone)]
23pub struct OnlineServerContext {
24 pub client: SuiClient,
25 pub coin_metadata_cache: CoinMetadataCache,
26 block_provider: Arc<dyn BlockProvider + Send + Sync>,
27}
28
29impl OnlineServerContext {
30 pub fn new(
31 client: SuiClient,
32 block_provider: Arc<dyn BlockProvider + Send + Sync>,
33 coin_metadata_cache: CoinMetadataCache,
34 ) -> Self {
35 Self {
36 client: client.clone(),
37 block_provider,
38 coin_metadata_cache,
39 }
40 }
41
42 pub fn blocks(&self) -> &(dyn BlockProvider + Sync + Send) {
43 &*self.block_provider
44 }
45}
46
47#[async_trait]
48pub trait BlockProvider {
49 async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error>;
50 async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error>;
51 async fn current_block(&self) -> Result<BlockResponse, Error>;
52 async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error>;
53 async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
54 async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error>;
55 async fn create_block_identifier(
56 &self,
57 checkpoint: CheckpointSequenceNumber,
58 ) -> Result<BlockIdentifier, Error>;
59}
60
61#[derive(Clone)]
62pub struct CheckpointBlockProvider {
63 client: SuiClient,
64 coin_metadata_cache: CoinMetadataCache,
65}
66
67#[async_trait]
68impl BlockProvider for CheckpointBlockProvider {
69 async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error> {
70 let checkpoint = self.client.read_api().get_checkpoint(index.into()).await?;
71 self.create_block_response(checkpoint).await
72 }
73
74 async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error> {
75 let checkpoint = self.client.read_api().get_checkpoint(hash.into()).await?;
76 self.create_block_response(checkpoint).await
77 }
78
79 async fn current_block(&self) -> Result<BlockResponse, Error> {
80 let checkpoint = self
81 .client
82 .read_api()
83 .get_latest_checkpoint_sequence_number()
84 .await?;
85 self.get_block_by_index(checkpoint).await
86 }
87
88 async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error> {
89 self.create_block_identifier(0).await
90 }
91
92 async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
93 self.create_block_identifier(0).await
94 }
95
96 async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error> {
97 let checkpoint = self
98 .client
99 .read_api()
100 .get_latest_checkpoint_sequence_number()
101 .await?;
102
103 self.create_block_identifier(checkpoint).await
104 }
105
106 async fn create_block_identifier(
107 &self,
108 checkpoint: CheckpointSequenceNumber,
109 ) -> Result<BlockIdentifier, Error> {
110 self.create_block_identifier(checkpoint).await
111 }
112}
113
114impl CheckpointBlockProvider {
115 pub fn new(client: SuiClient, coin_metadata_cache: CoinMetadataCache) -> Self {
116 Self {
117 client,
118 coin_metadata_cache,
119 }
120 }
121
122 async fn create_block_response(&self, checkpoint: Checkpoint) -> Result<BlockResponse, Error> {
123 let index = checkpoint.sequence_number;
124 let hash = checkpoint.digest;
125
126 let chunks = checkpoint
127 .transactions
128 .chunks(5)
129 .map(|batch| async {
130 let transaction_responses = self
131 .client
132 .read_api()
133 .multi_get_transactions_with_options(
134 batch.to_vec(),
135 SuiTransactionBlockResponseOptions::new()
136 .with_input()
137 .with_effects()
138 .with_balance_changes()
139 .with_events(),
140 )
141 .await?;
142
143 let mut transactions = vec![];
144 for tx in transaction_responses.into_iter() {
145 transactions.push(Transaction {
146 transaction_identifier: TransactionIdentifier { hash: tx.digest },
147 operations: Operations::try_from_response(tx, &self.coin_metadata_cache)
148 .await?,
149 related_transactions: vec![],
150 metadata: None,
151 })
152 }
153 Ok::<Vec<_>, anyhow::Error>(transactions)
154 })
155 .collect::<Vec<_>>();
156
157 let transactions = try_join_all(chunks)
158 .await?
159 .into_iter()
160 .flatten()
161 .collect::<Vec<_>>();
162
163 if checkpoint.previous_digest.is_none() && index != 0 {
165 return Err(Error::DataError(format!(
166 "Previous digest is None for checkpoint [{index}], digest: [{hash:?}]"
167 )));
168 }
169
170 let parent_block_identifier = checkpoint
171 .previous_digest
172 .map(|hash| BlockIdentifier {
173 index: index - 1,
174 hash,
175 })
176 .unwrap_or_else(|| BlockIdentifier { index, hash });
177
178 Ok(BlockResponse {
179 block: Block {
180 block_identifier: BlockIdentifier { index, hash },
181 parent_block_identifier,
182 timestamp: checkpoint.timestamp_ms,
183 transactions,
184 metadata: None,
185 },
186 other_transactions: vec![],
187 })
188 }
189
190 async fn create_block_identifier(
191 &self,
192 seq_number: CheckpointSequenceNumber,
193 ) -> Result<BlockIdentifier, Error> {
194 let checkpoint = self
195 .client
196 .read_api()
197 .get_checkpoint(seq_number.into())
198 .await?;
199 Ok(BlockIdentifier {
200 index: checkpoint.sequence_number,
201 hash: checkpoint.digest,
202 })
203 }
204}