1use async_trait::async_trait;
5use chrono::DateTime;
6use futures::stream::{self, StreamExt, TryStreamExt};
7use prost_types::FieldMask;
8use std::str::FromStr;
9use std::sync::Arc;
10use sui_rpc::client::Client as GrpcClient;
11use sui_rpc::field::FieldMaskUtil;
12use sui_rpc::proto::sui::rpc::v2::{
13 Checkpoint, GetCheckpointRequest, GetServiceInfoRequest, get_checkpoint_request,
14};
15use sui_types::base_types::TransactionDigest;
16use sui_types::digests::CheckpointDigest;
17use sui_types::messages_checkpoint::CheckpointSequenceNumber;
18
19use sui_types::digests::ChainIdentifier;
20
21use crate::operations::Operations;
22use crate::types::{
23 Block, BlockHash, BlockIdentifier, BlockResponse, Transaction, TransactionIdentifier,
24};
25use crate::{CoinMetadataCache, Error};
26
27#[derive(Clone)]
28pub struct OnlineServerContext {
29 pub client: GrpcClient,
30 pub coin_metadata_cache: CoinMetadataCache,
31 pub chain_id: ChainIdentifier,
32 block_provider: Arc<dyn BlockProvider + Send + Sync>,
33}
34
35impl OnlineServerContext {
36 pub fn new(
37 client: GrpcClient,
38 block_provider: Arc<dyn BlockProvider + Send + Sync>,
39 coin_metadata_cache: CoinMetadataCache,
40 chain_id: ChainIdentifier,
41 ) -> Self {
42 Self {
43 client,
44 block_provider,
45 coin_metadata_cache,
46 chain_id,
47 }
48 }
49
50 pub fn blocks(&self) -> &(dyn BlockProvider + Sync + Send) {
51 &*self.block_provider
52 }
53}
54
55#[async_trait]
56pub trait BlockProvider {
57 async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error>;
58 async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error>;
59 async fn current_block(&self) -> Result<BlockResponse, Error>;
60 async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error>;
61 async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
62 async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error>;
63 async fn create_block_identifier(
64 &self,
65 checkpoint: CheckpointSequenceNumber,
66 ) -> Result<BlockIdentifier, Error>;
67}
68
69#[derive(Clone)]
70pub struct CheckpointBlockProvider {
71 client: GrpcClient,
72 coin_metadata_cache: CoinMetadataCache,
73}
74
75#[async_trait]
76impl BlockProvider for CheckpointBlockProvider {
77 async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error> {
78 let request = GetCheckpointRequest::by_sequence_number(index).with_read_mask(
79 FieldMask::from_paths([
80 "sequence_number",
81 "digest",
82 "summary.sequence_number",
83 "summary.previous_digest",
84 "summary.timestamp",
85 "transactions.digest",
86 "transactions.transaction.sender",
87 "transactions.transaction.gas_payment",
88 "transactions.transaction.kind",
89 "transactions.effects.gas_object",
90 "transactions.effects.gas_used",
91 "transactions.effects.status",
92 "transactions.balance_changes",
93 "transactions.events.events.event_type",
94 "transactions.events.events.json",
95 ]),
96 );
97
98 let mut client = self.client.clone();
99 let response = client
100 .ledger_client()
101 .get_checkpoint(request)
102 .await
103 .map_err(|e| Error::from(anyhow::anyhow!("Failed to get checkpoint: {}", e)))?
104 .into_inner();
105
106 let checkpoint = response
107 .checkpoint
108 .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
109
110 self.create_block_response(checkpoint).await
111 }
112
113 async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error> {
114 let mut request = GetCheckpointRequest::default().with_read_mask(FieldMask::from_paths([
115 "sequence_number",
116 "digest",
117 "summary.sequence_number",
118 "summary.previous_digest",
119 "summary.timestamp",
120 "transactions.digest",
121 "transactions.transaction.sender",
122 "transactions.transaction.gas_payment",
123 "transactions.transaction.kind",
124 "transactions.effects.gas_object",
125 "transactions.effects.gas_used",
126 "transactions.effects.status",
127 "transactions.balance_changes",
128 "transactions.events.events.event_type",
129 "transactions.events.events.json",
130 ]));
131 request.checkpoint_id = Some(get_checkpoint_request::CheckpointId::Digest(
132 hash.to_string(),
133 ));
134
135 let mut client = self.client.clone();
136 let response = client
137 .ledger_client()
138 .get_checkpoint(request)
139 .await?
140 .into_inner();
141 let checkpoint = response
142 .checkpoint
143 .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
144
145 self.create_block_response(checkpoint).await
146 }
147
148 async fn current_block(&self) -> Result<BlockResponse, Error> {
149 let request = GetCheckpointRequest::latest()
150 .with_read_mask(FieldMask::from_paths(["sequence_number"]));
151
152 let mut client = self.client.clone();
153 let response = client
154 .ledger_client()
155 .get_checkpoint(request)
156 .await?
157 .into_inner();
158
159 let sequence_number = response.checkpoint().sequence_number();
160 self.get_block_by_index(sequence_number).await
161 }
162
163 async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error> {
164 let response = self
165 .client
166 .clone()
167 .ledger_client()
168 .get_service_info(GetServiceInfoRequest::default())
169 .await?
170 .into_inner();
171 let chain_id = response
172 .chain_id
173 .ok_or_else(|| Error::DataError("Missing chain_id".to_string()))?;
174 let hash = CheckpointDigest::from_str(&chain_id)?;
175 Ok(BlockIdentifier { index: 0, hash })
176 }
177
178 async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
179 let response = self
180 .client
181 .clone()
182 .ledger_client()
183 .get_service_info(GetServiceInfoRequest::default())
184 .await?
185 .into_inner();
186 let lowest = response
187 .lowest_available_checkpoint
188 .ok_or_else(|| Error::DataError("Missing lowest_available_checkpoint".to_string()))?;
189 self.create_block_identifier(lowest).await
190 }
191
192 async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error> {
193 let request = GetCheckpointRequest::latest()
194 .with_read_mask(FieldMask::from_paths(["sequence_number"]));
195
196 let response = self
197 .client
198 .clone()
199 .ledger_client()
200 .get_checkpoint(request)
201 .await?
202 .into_inner();
203
204 let checkpoint = response
205 .checkpoint
206 .ok_or_else(|| Error::DataError("Missing checkpoint".to_string()))?;
207
208 let sequence_number = checkpoint.sequence_number();
209
210 self.create_block_identifier(sequence_number).await
211 }
212
213 async fn create_block_identifier(
214 &self,
215 checkpoint: CheckpointSequenceNumber,
216 ) -> Result<BlockIdentifier, Error> {
217 self.create_block_identifier(checkpoint).await
218 }
219}
220
221impl CheckpointBlockProvider {
222 pub fn new(client: GrpcClient, coin_metadata_cache: CoinMetadataCache) -> Self {
223 Self {
224 client,
225 coin_metadata_cache,
226 }
227 }
228
229 async fn create_block_response(&self, checkpoint: Checkpoint) -> Result<BlockResponse, Error> {
230 let summary = checkpoint.summary();
231 let index = summary.sequence_number();
232 let hash = CheckpointDigest::from_str(checkpoint.digest())?;
233 let previous_hash = if index == 0 {
235 hash
236 } else {
237 CheckpointDigest::from_str(summary.previous_digest())?
238 };
239 let timestamp_ms = summary
240 .timestamp
241 .ok_or_else(|| Error::DataError("Checkpoint timestamp is missing".to_string()))
242 .and_then(|ts| {
243 DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
244 .ok_or_else(|| Error::DataError(format!("Invalid timestamp: {}", ts)))
245 })?
246 .timestamp_millis() as u64;
247
248 let transactions: Vec<Transaction> = stream::iter(checkpoint.transactions)
249 .map(|executed_tx| async move {
250 let digest = TransactionDigest::from_str(executed_tx.digest())?;
251 Ok::<_, Error>(Transaction {
252 transaction_identifier: TransactionIdentifier { hash: digest },
253 operations: Operations::try_from_executed_transaction(
256 executed_tx,
257 &self.coin_metadata_cache,
258 )
259 .await?,
260 related_transactions: vec![],
261 metadata: None,
262 })
263 })
264 .buffer_unordered(10)
268 .try_collect()
269 .await?;
270
271 let parent_block_identifier = if index == 0 {
272 BlockIdentifier { index, hash }
274 } else {
275 BlockIdentifier {
276 index: index - 1,
277 hash: previous_hash,
278 }
279 };
280
281 Ok(BlockResponse {
282 block: Block {
283 block_identifier: BlockIdentifier { index, hash },
284 parent_block_identifier,
285 timestamp: timestamp_ms,
286 transactions,
287 metadata: None,
288 },
289 other_transactions: vec![],
290 })
291 }
292
293 async fn create_block_identifier(
294 &self,
295 seq_number: CheckpointSequenceNumber,
296 ) -> Result<BlockIdentifier, Error> {
297 let grpc_request = GetCheckpointRequest::by_sequence_number(seq_number)
298 .with_read_mask(FieldMask::from_paths(["sequence_number", "digest"]));
299 let mut client = self.client.clone();
300 let response = client
301 .ledger_client()
302 .get_checkpoint(grpc_request)
303 .await?
304 .into_inner();
305
306 let checkpoint = response.checkpoint();
307 let index = checkpoint.sequence_number();
308 let hash = checkpoint.digest();
309
310 Ok(BlockIdentifier {
311 index,
312 hash: CheckpointDigest::from_str(hash)?,
313 })
314 }
315}