1use async_trait::async_trait;
5use chrono::DateTime;
6use futures::stream::{self, StreamExt, TryStreamExt};
7use prost_types::FieldMask;
8use std::str::FromStr;
9use std::sync::Arc;
10use sui_rpc::client::Client as GrpcClient;
11use sui_rpc::field::FieldMaskUtil;
12use sui_rpc::proto::sui::rpc::v2::{Checkpoint, GetCheckpointRequest, get_checkpoint_request};
13use sui_types::base_types::TransactionDigest;
14use sui_types::digests::CheckpointDigest;
15use sui_types::messages_checkpoint::CheckpointSequenceNumber;
16
17use crate::operations::Operations;
18use crate::types::{
19 Block, BlockHash, BlockIdentifier, BlockResponse, Transaction, TransactionIdentifier,
20};
21use crate::{CoinMetadataCache, Error};
22
23#[derive(Clone)]
24pub struct OnlineServerContext {
25 pub client: GrpcClient,
26 pub coin_metadata_cache: CoinMetadataCache,
27 block_provider: Arc<dyn BlockProvider + Send + Sync>,
28}
29
30impl OnlineServerContext {
31 pub fn new(
32 client: GrpcClient,
33 block_provider: Arc<dyn BlockProvider + Send + Sync>,
34 coin_metadata_cache: CoinMetadataCache,
35 ) -> Self {
36 Self {
37 client,
38 block_provider,
39 coin_metadata_cache,
40 }
41 }
42
43 pub fn blocks(&self) -> &(dyn BlockProvider + Sync + Send) {
44 &*self.block_provider
45 }
46}
47
48#[async_trait]
49pub trait BlockProvider {
50 async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error>;
51 async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error>;
52 async fn current_block(&self) -> Result<BlockResponse, Error>;
53 async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error>;
54 async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
55 async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error>;
56 async fn create_block_identifier(
57 &self,
58 checkpoint: CheckpointSequenceNumber,
59 ) -> Result<BlockIdentifier, Error>;
60}
61
62#[derive(Clone)]
63pub struct CheckpointBlockProvider {
64 client: GrpcClient,
65 coin_metadata_cache: CoinMetadataCache,
66}
67
68#[async_trait]
69impl BlockProvider for CheckpointBlockProvider {
70 async fn get_block_by_index(&self, index: u64) -> Result<BlockResponse, Error> {
71 let request = GetCheckpointRequest::by_sequence_number(index).with_read_mask(
72 FieldMask::from_paths([
73 "sequence_number",
74 "digest",
75 "summary.sequence_number",
76 "summary.previous_digest",
77 "summary.timestamp",
78 "transactions.digest",
79 "transactions.transaction.sender",
80 "transactions.transaction.gas_payment",
81 "transactions.transaction.kind",
82 "transactions.effects.gas_object",
83 "transactions.effects.gas_used",
84 "transactions.effects.status",
85 "transactions.balance_changes",
86 "transactions.events.events.event_type",
87 "transactions.events.events.json",
88 ]),
89 );
90
91 let mut client = self.client.clone();
92 let response = client
93 .ledger_client()
94 .get_checkpoint(request)
95 .await
96 .map_err(|e| Error::from(anyhow::anyhow!("Failed to get checkpoint: {}", e)))?
97 .into_inner();
98
99 let checkpoint = response
100 .checkpoint
101 .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
102
103 self.create_block_response(checkpoint).await
104 }
105
106 async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error> {
107 let mut request = GetCheckpointRequest::default().with_read_mask(FieldMask::from_paths([
108 "sequence_number",
109 "digest",
110 "summary.sequence_number",
111 "summary.previous_digest",
112 "summary.timestamp",
113 "transactions.digest",
114 "transactions.transaction.sender",
115 "transactions.transaction.gas_payment",
116 "transactions.transaction.kind",
117 "transactions.effects.gas_object",
118 "transactions.effects.gas_used",
119 "transactions.effects.status",
120 "transactions.balance_changes",
121 "transactions.events.events.event_type",
122 "transactions.events.events.json",
123 ]));
124 request.checkpoint_id = Some(get_checkpoint_request::CheckpointId::Digest(
125 hash.to_string(),
126 ));
127
128 let mut client = self.client.clone();
129 let response = client
130 .ledger_client()
131 .get_checkpoint(request)
132 .await?
133 .into_inner();
134 let checkpoint = response
135 .checkpoint
136 .ok_or_else(|| Error::DataError("Checkpoint not found".to_string()))?;
137
138 self.create_block_response(checkpoint).await
139 }
140
141 async fn current_block(&self) -> Result<BlockResponse, Error> {
142 let request = GetCheckpointRequest::latest()
143 .with_read_mask(FieldMask::from_paths(["sequence_number"]));
144
145 let mut client = self.client.clone();
146 let response = client
147 .ledger_client()
148 .get_checkpoint(request)
149 .await?
150 .into_inner();
151
152 let sequence_number = response.checkpoint().sequence_number();
153 self.get_block_by_index(sequence_number).await
154 }
155
156 async fn genesis_block_identifier(&self) -> Result<BlockIdentifier, Error> {
157 self.create_block_identifier(0).await
158 }
159
160 async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
161 self.create_block_identifier(0).await
162 }
163
164 async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error> {
165 let request = GetCheckpointRequest::latest()
166 .with_read_mask(FieldMask::from_paths(["sequence_number"]));
167
168 let response = self
169 .client
170 .clone()
171 .ledger_client()
172 .get_checkpoint(request)
173 .await?
174 .into_inner();
175
176 let checkpoint = response
177 .checkpoint
178 .ok_or_else(|| Error::DataError("Missing checkpoint".to_string()))?;
179
180 let sequence_number = checkpoint.sequence_number();
181
182 self.create_block_identifier(sequence_number).await
183 }
184
185 async fn create_block_identifier(
186 &self,
187 checkpoint: CheckpointSequenceNumber,
188 ) -> Result<BlockIdentifier, Error> {
189 self.create_block_identifier(checkpoint).await
190 }
191}
192
193impl CheckpointBlockProvider {
194 pub fn new(client: GrpcClient, coin_metadata_cache: CoinMetadataCache) -> Self {
195 Self {
196 client,
197 coin_metadata_cache,
198 }
199 }
200
201 async fn create_block_response(&self, checkpoint: Checkpoint) -> Result<BlockResponse, Error> {
202 let summary = checkpoint.summary();
203 let index = summary.sequence_number();
204 let hash = CheckpointDigest::from_str(checkpoint.digest())?;
205 let previous_hash = if index == 0 {
207 hash
208 } else {
209 CheckpointDigest::from_str(summary.previous_digest())?
210 };
211 let timestamp_ms = summary
212 .timestamp
213 .ok_or_else(|| Error::DataError("Checkpoint timestamp is missing".to_string()))
214 .and_then(|ts| {
215 DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
216 .ok_or_else(|| Error::DataError(format!("Invalid timestamp: {}", ts)))
217 })?
218 .timestamp_millis() as u64;
219
220 let transactions: Vec<Transaction> = stream::iter(checkpoint.transactions)
221 .map(|executed_tx| async move {
222 let digest = TransactionDigest::from_str(executed_tx.digest())?;
223 Ok::<_, Error>(Transaction {
224 transaction_identifier: TransactionIdentifier { hash: digest },
225 operations: Operations::try_from_executed_transaction(
228 executed_tx,
229 &self.coin_metadata_cache,
230 )
231 .await?,
232 related_transactions: vec![],
233 metadata: None,
234 })
235 })
236 .buffer_unordered(10)
240 .try_collect()
241 .await?;
242
243 let parent_block_identifier = if index == 0 {
244 BlockIdentifier { index, hash }
246 } else {
247 BlockIdentifier {
248 index: index - 1,
249 hash: previous_hash,
250 }
251 };
252
253 Ok(BlockResponse {
254 block: Block {
255 block_identifier: BlockIdentifier { index, hash },
256 parent_block_identifier,
257 timestamp: timestamp_ms,
258 transactions,
259 metadata: None,
260 },
261 other_transactions: vec![],
262 })
263 }
264
265 async fn create_block_identifier(
266 &self,
267 seq_number: CheckpointSequenceNumber,
268 ) -> Result<BlockIdentifier, Error> {
269 let grpc_request = GetCheckpointRequest::by_sequence_number(seq_number)
270 .with_read_mask(FieldMask::from_paths(["sequence_number", "digest"]));
271 let mut client = self.client.clone();
272 let response = client
273 .ledger_client()
274 .get_checkpoint(grpc_request)
275 .await?
276 .into_inner();
277
278 let checkpoint = response.checkpoint();
279 let index = checkpoint.sequence_number();
280 let hash = checkpoint.digest();
281
282 Ok(BlockIdentifier {
283 index,
284 hash: CheckpointDigest::from_str(hash)?,
285 })
286 }
287}