1use std::sync::Arc;
14
15use sui_consistent_store::reader::Reader;
16use sui_types::base_types::TransactionDigest;
17use sui_types::committee::Committee;
18use sui_types::committee::EpochId;
19use sui_types::digests::CheckpointContentsDigest;
20use sui_types::digests::CheckpointDigest;
21use sui_types::effects::TransactionEffects;
22use sui_types::effects::TransactionEvents;
23use sui_types::messages_checkpoint::CheckpointContents;
24use sui_types::messages_checkpoint::CheckpointSequenceNumber;
25use sui_types::messages_checkpoint::VerifiedCheckpoint;
26use sui_types::storage::ObjectKey;
27use sui_types::storage::ReadStore;
28use sui_types::storage::error::Error as StorageError;
29use sui_types::storage::error::Result as StorageResult;
30use sui_types::transaction::VerifiedTransaction;
31use tracing::error;
32
33use crate::reader::RpcStoreReader;
34use crate::schema::primitives::U64Be;
35
36impl<R: Reader + Send + Sync> ReadStore for RpcStoreReader<R> {
37 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
38 match self.schema().get_committee(epoch) {
39 Ok(Some(committee)) => Some(Arc::new(committee)),
40 Ok(None) => None,
41 Err(e) => {
42 error!(epoch, "get_committee: {e:#}");
43 None
44 }
45 }
46 }
47
48 fn get_latest_checkpoint(&self) -> StorageResult<VerifiedCheckpoint> {
49 let latest = self
55 .schema()
56 .checkpoint_summary
57 .iter_rev(..)
58 .map_err(StorageError::custom)?
59 .next();
60 let Some(entry) = latest else {
61 return Err(StorageError::missing("no checkpoints in store"));
62 };
63 let (U64Be(seq), _) = entry.map_err(StorageError::custom)?;
64 self.schema()
65 .get_checkpoint_summary(seq)
66 .map_err(StorageError::custom)?
67 .ok_or_else(|| StorageError::missing(format!("checkpoint {seq} disappeared")))
68 }
69
70 fn get_highest_verified_checkpoint(&self) -> StorageResult<VerifiedCheckpoint> {
71 self.get_latest_checkpoint()
74 }
75
76 fn get_highest_synced_checkpoint(&self) -> StorageResult<VerifiedCheckpoint> {
77 self.get_latest_checkpoint()
83 }
84
85 fn get_lowest_available_checkpoint(&self) -> StorageResult<CheckpointSequenceNumber> {
86 let watermarks = self
87 .schema()
88 .get_pruning_watermarks()
89 .map_err(StorageError::custom)?
90 .unwrap_or_default();
91 Ok(watermarks.checkpoint_lo)
92 }
93
94 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
95 let seq = match self.schema().get_checkpoint_seq_by_digest(digest) {
96 Ok(Some(seq)) => seq,
97 Ok(None) => return None,
98 Err(e) => {
99 error!(?digest, "get_checkpoint_by_digest seq lookup: {e:#}");
100 return None;
101 }
102 };
103 match self.schema().get_checkpoint_summary(seq) {
104 Ok(summary) => summary,
105 Err(e) => {
106 error!(seq, "get_checkpoint_by_digest summary lookup: {e:#}");
107 None
108 }
109 }
110 }
111
112 fn get_checkpoint_by_sequence_number(
113 &self,
114 sequence_number: CheckpointSequenceNumber,
115 ) -> Option<VerifiedCheckpoint> {
116 match self.schema().get_checkpoint_summary(sequence_number) {
117 Ok(summary) => summary,
118 Err(e) => {
119 error!(sequence_number, "get_checkpoint_by_sequence_number: {e:#}");
120 None
121 }
122 }
123 }
124
125 fn get_checkpoint_contents_by_digest(
126 &self,
127 _digest: &CheckpointContentsDigest,
128 ) -> Option<CheckpointContents> {
129 None
136 }
137
138 fn get_checkpoint_contents_by_sequence_number(
139 &self,
140 sequence_number: CheckpointSequenceNumber,
141 ) -> Option<CheckpointContents> {
142 match self.schema().get_checkpoint_contents(sequence_number) {
143 Ok(contents) => contents,
144 Err(e) => {
145 error!(
146 sequence_number,
147 "get_checkpoint_contents_by_sequence_number: {e:#}"
148 );
149 None
150 }
151 }
152 }
153
154 fn get_transaction(&self, tx_digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
155 let tx_seq = match self.schema().get_tx_seq_by_digest(tx_digest) {
156 Ok(Some(seq)) => seq,
157 Ok(None) => return None,
158 Err(e) => {
159 error!(?tx_digest, "get_transaction seq lookup: {e:#}");
160 return None;
161 }
162 };
163 let (transaction, signatures) = match self.schema().get_transaction(tx_seq) {
164 Ok(Some(pair)) => pair,
165 Ok(None) => return None,
166 Err(e) => {
167 error!(tx_seq, "get_transaction data lookup: {e:#}");
168 return None;
169 }
170 };
171 let envelope =
172 sui_types::transaction::Transaction::from_generic_sig_data(transaction, signatures);
173 Some(Arc::new(VerifiedTransaction::new_unchecked(envelope)))
174 }
175
176 fn get_transaction_effects(&self, tx_digest: &TransactionDigest) -> Option<TransactionEffects> {
177 let tx_seq = match self.schema().get_tx_seq_by_digest(tx_digest) {
178 Ok(Some(seq)) => seq,
179 Ok(None) => return None,
180 Err(e) => {
181 error!(?tx_digest, "get_transaction_effects seq lookup: {e:#}");
182 return None;
183 }
184 };
185 match self.schema().get_effects(tx_seq) {
186 Ok(Some((effects, _unchanged))) => Some(effects),
187 Ok(None) => None,
188 Err(e) => {
189 error!(tx_seq, "get_transaction_effects: {e:#}");
190 None
191 }
192 }
193 }
194
195 fn get_events(&self, event_digest: &TransactionDigest) -> Option<TransactionEvents> {
196 let tx_seq = match self.schema().get_tx_seq_by_digest(event_digest) {
200 Ok(Some(seq)) => seq,
201 Ok(None) => return None,
202 Err(e) => {
203 error!(?event_digest, "get_events seq lookup: {e:#}");
204 return None;
205 }
206 };
207 match self.schema().get_events(tx_seq) {
208 Ok(events) => events,
209 Err(e) => {
210 error!(tx_seq, "get_events: {e:#}");
211 None
212 }
213 }
214 }
215
216 fn get_unchanged_loaded_runtime_objects(
217 &self,
218 digest: &TransactionDigest,
219 ) -> Option<Vec<ObjectKey>> {
220 let tx_seq = match self.schema().get_tx_seq_by_digest(digest) {
221 Ok(Some(seq)) => seq,
222 Ok(None) => return None,
223 Err(e) => {
224 error!(
225 ?digest,
226 "get_unchanged_loaded_runtime_objects seq lookup: {e:#}"
227 );
228 return None;
229 }
230 };
231 match self.schema().get_effects(tx_seq) {
232 Ok(Some((_effects, unchanged))) => Some(unchanged),
233 Ok(None) => None,
234 Err(e) => {
235 error!(tx_seq, "get_unchanged_loaded_runtime_objects: {e:#}");
236 None
237 }
238 }
239 }
240
241 fn get_transaction_checkpoint(
242 &self,
243 digest: &TransactionDigest,
244 ) -> Option<CheckpointSequenceNumber> {
245 let tx_seq = match self.schema().get_tx_seq_by_digest(digest) {
246 Ok(Some(seq)) => seq,
247 Ok(None) => return None,
248 Err(e) => {
249 error!(?digest, "get_transaction_checkpoint seq lookup: {e:#}");
250 return None;
251 }
252 };
253 match self.schema().get_tx_metadata_by_seq(tx_seq) {
254 Ok(Some(meta)) => Some(meta.checkpoint_seq),
255 Ok(None) => None,
256 Err(e) => {
257 error!(tx_seq, "get_transaction_checkpoint: {e:#}");
258 None
259 }
260 }
261 }
262
263 fn get_full_checkpoint_contents(
264 &self,
265 _sequence_number: Option<CheckpointSequenceNumber>,
266 _digest: &CheckpointContentsDigest,
267 ) -> Option<sui_types::messages_checkpoint::VersionedFullCheckpointContents> {
268 None
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use std::sync::Arc;
281
282 use sui_consistent_store::Db;
283 use sui_consistent_store::DbOptions;
284 use sui_types::crypto::AggregateAuthoritySignature;
285 use sui_types::crypto::AuthorityStrongQuorumSignInfo;
286 use sui_types::digests::CheckpointDigest;
287 use sui_types::digests::TransactionDigest;
288 use sui_types::gas::GasCostSummary;
289 use sui_types::message_envelope::Message;
290 use sui_types::messages_checkpoint::CheckpointSummary;
291 use sui_types::storage::ReadStore;
292
293 use crate::RpcStoreSchema;
294 use crate::reader::RpcStoreReader;
295 use crate::schema::checkpoint_contents;
296 use crate::schema::checkpoint_seq_by_digest;
297 use crate::schema::checkpoint_summary;
298 use crate::schema::primitives::U64Be;
299 use crate::schema::primitives::U64Varint;
300 use crate::schema::pruning_watermark;
301
302 fn dummy_summary(seq: u64) -> CheckpointSummary {
303 CheckpointSummary {
304 epoch: 0,
305 sequence_number: seq,
306 network_total_transactions: 0,
307 content_digest: sui_types::digests::CheckpointContentsDigest::new([0; 32]),
308 previous_digest: None,
309 epoch_rolling_gas_cost_summary: GasCostSummary::default(),
310 timestamp_ms: 0,
311 checkpoint_commitments: vec![],
312 end_of_epoch_data: None,
313 version_specific_data: vec![],
314 }
315 }
316
317 fn dummy_signature() -> AuthorityStrongQuorumSignInfo {
318 AuthorityStrongQuorumSignInfo {
319 epoch: 0,
320 signature: AggregateAuthoritySignature::default(),
321 signers_map: roaring::RoaringBitmap::new(),
322 }
323 }
324
325 fn fresh_reader() -> (tempfile::TempDir, Db, RpcStoreReader) {
326 let dir = tempfile::tempdir().unwrap();
327 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
328 let reader = RpcStoreReader::new(db.clone(), Arc::new(schema));
329 (dir, db, reader)
330 }
331
332 fn seed_checkpoint(db: &Db, reader: &RpcStoreReader, seq: u64) -> CheckpointSummary {
333 let summary = dummy_summary(seq);
334 let digest = summary.digest();
335 let mut batch = db.batch();
336 batch
337 .put(
338 &reader.schema().checkpoint_summary,
339 &U64Be(seq),
340 &checkpoint_summary::store(&summary, &dummy_signature()),
341 )
342 .unwrap();
343 batch
344 .put(
345 &reader.schema().checkpoint_seq_by_digest,
346 &checkpoint_seq_by_digest::Key(digest),
347 &U64Varint(seq),
348 )
349 .unwrap();
350 batch.commit().unwrap();
351 summary
352 }
353
354 #[test]
355 fn latest_checkpoint_errors_when_empty() {
356 let (_dir, _db, reader) = fresh_reader();
357 let err = reader.get_latest_checkpoint().unwrap_err();
358 assert!(format!("{err:#}").contains("no checkpoints"));
359 }
360
361 #[test]
362 fn latest_checkpoint_returns_highest_seq() {
363 let (_dir, db, reader) = fresh_reader();
364 seed_checkpoint(&db, &reader, 0);
365 let s5 = seed_checkpoint(&db, &reader, 5);
366 seed_checkpoint(&db, &reader, 3);
367
368 let latest = reader.get_latest_checkpoint().unwrap();
369 assert_eq!(latest.sequence_number(), s5.sequence_number());
370 }
371
372 #[test]
373 fn lookup_by_digest_round_trips() {
374 let (_dir, db, reader) = fresh_reader();
375 let summary = seed_checkpoint(&db, &reader, 7);
376 let digest: CheckpointDigest = summary.digest();
377 let read = reader.get_checkpoint_by_digest(&digest).expect("present");
378 assert_eq!(read.sequence_number(), summary.sequence_number());
379 }
380
381 #[test]
382 fn lookup_by_digest_returns_none_for_unknown() {
383 let (_dir, _db, reader) = fresh_reader();
384 let digest = CheckpointDigest::new([9; 32]);
385 assert!(reader.get_checkpoint_by_digest(&digest).is_none());
386 }
387
388 #[test]
389 fn lowest_available_returns_zero_when_unset() {
390 let (_dir, _db, reader) = fresh_reader();
391 assert_eq!(reader.get_lowest_available_checkpoint().unwrap(), 0);
392 }
393
394 #[test]
395 fn lowest_available_reflects_pruning_watermark() {
396 let (_dir, db, reader) = fresh_reader();
397 let mut batch = db.batch();
398 batch
399 .put(
400 &reader.schema().pruning_watermark,
401 &crate::schema::primitives::UnitKey,
402 &pruning_watermark::store(&pruning_watermark::Watermarks {
403 tx_seq_lo: 100,
404 checkpoint_lo: 42,
405 })
406 .1,
407 )
408 .unwrap();
409 batch.commit().unwrap();
410 assert_eq!(reader.get_lowest_available_checkpoint().unwrap(), 42);
411 }
412
413 #[test]
414 fn contents_by_seq_round_trips() {
415 let (_dir, db, reader) = fresh_reader();
416 let contents =
417 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
418 vec![sui_types::base_types::ExecutionDigests {
419 transaction: TransactionDigest::new([1; 32]),
420 effects: sui_types::digests::TransactionEffectsDigest::new([2; 32]),
421 }],
422 );
423 let mut batch = db.batch();
424 batch
425 .put(
426 &reader.schema().checkpoint_contents,
427 &U64Be(11),
428 &checkpoint_contents::store(&contents),
429 )
430 .unwrap();
431 batch.commit().unwrap();
432
433 let read = reader
434 .get_checkpoint_contents_by_sequence_number(11)
435 .expect("present");
436 assert_eq!(read.digest(), contents.digest());
437 }
438}