sui_rpc_store/reader/
read_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! [`ReadStore`] adapter — checkpoints, committees, transactions,
5//! effects, events.
6//!
7//! All point lookups delegate to the inherent helpers on
8//! [`RpcStoreSchema`]. Trait methods that return [`Option`] suppress
9//! storage errors and log them at `error` level; trait methods that
10//! return [`Result`] surface them as
11//! [`sui_types::storage::error::Error`] via `Error::custom`.
12
13use std::sync::Arc;
14
15use sui_consistent_store::reader::Reader;
16use sui_types::base_types::TransactionDigest;
17use sui_types::committee::Committee;
18use sui_types::committee::EpochId;
19use sui_types::digests::CheckpointContentsDigest;
20use sui_types::digests::CheckpointDigest;
21use sui_types::effects::TransactionEffects;
22use sui_types::effects::TransactionEvents;
23use sui_types::messages_checkpoint::CheckpointContents;
24use sui_types::messages_checkpoint::CheckpointSequenceNumber;
25use sui_types::messages_checkpoint::VerifiedCheckpoint;
26use sui_types::storage::ObjectKey;
27use sui_types::storage::ReadStore;
28use sui_types::storage::error::Error as StorageError;
29use sui_types::storage::error::Result as StorageResult;
30use sui_types::transaction::VerifiedTransaction;
31use tracing::error;
32
33use crate::reader::RpcStoreReader;
34use crate::schema::primitives::U64Be;
35
36impl<R: Reader + Send + Sync> ReadStore for RpcStoreReader<R> {
37    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
38        match self.schema().get_committee(epoch) {
39            Ok(Some(committee)) => Some(Arc::new(committee)),
40            Ok(None) => None,
41            Err(e) => {
42                error!(epoch, "get_committee: {e:#}");
43                None
44            }
45        }
46    }
47
48    fn get_latest_checkpoint(&self) -> StorageResult<VerifiedCheckpoint> {
49        // The latest checkpoint header in `checkpoint_summary` is
50        // the highest committed checkpoint. Read paths that require
51        // every CF to be in sync at this checkpoint should be
52        // routed through `at_snapshot` instead — there is no
53        // ambient "min watermark across pipelines" guarantee here.
54        let latest = self
55            .schema()
56            .checkpoint_summary
57            .iter_rev(..)
58            .map_err(StorageError::custom)?
59            .next();
60        let Some(entry) = latest else {
61            return Err(StorageError::missing("no checkpoints in store"));
62        };
63        let (U64Be(seq), _) = entry.map_err(StorageError::custom)?;
64        self.schema()
65            .get_checkpoint_summary(seq)
66            .map_err(StorageError::custom)?
67            .ok_or_else(|| StorageError::missing(format!("checkpoint {seq} disappeared")))
68    }
69
70    fn get_highest_verified_checkpoint(&self) -> StorageResult<VerifiedCheckpoint> {
71        // We only commit checkpoints that have been verified, so
72        // "highest verified" coincides with "latest committed".
73        self.get_latest_checkpoint()
74    }
75
76    fn get_highest_synced_checkpoint(&self) -> StorageResult<VerifiedCheckpoint> {
77        // Likewise: we only commit checkpoints once their
78        // contents/transactions/effects have been ingested, so
79        // "highest synced" coincides with "latest committed". A
80        // checkpoint header without its companion CFs is not a
81        // state this store represents.
82        self.get_latest_checkpoint()
83    }
84
85    fn get_lowest_available_checkpoint(&self) -> StorageResult<CheckpointSequenceNumber> {
86        let watermarks = self
87            .schema()
88            .get_pruning_watermarks()
89            .map_err(StorageError::custom)?
90            .unwrap_or_default();
91        Ok(watermarks.checkpoint_lo)
92    }
93
94    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
95        let seq = match self.schema().get_checkpoint_seq_by_digest(digest) {
96            Ok(Some(seq)) => seq,
97            Ok(None) => return None,
98            Err(e) => {
99                error!(?digest, "get_checkpoint_by_digest seq lookup: {e:#}");
100                return None;
101            }
102        };
103        match self.schema().get_checkpoint_summary(seq) {
104            Ok(summary) => summary,
105            Err(e) => {
106                error!(seq, "get_checkpoint_by_digest summary lookup: {e:#}");
107                None
108            }
109        }
110    }
111
112    fn get_checkpoint_by_sequence_number(
113        &self,
114        sequence_number: CheckpointSequenceNumber,
115    ) -> Option<VerifiedCheckpoint> {
116        match self.schema().get_checkpoint_summary(sequence_number) {
117            Ok(summary) => summary,
118            Err(e) => {
119                error!(sequence_number, "get_checkpoint_by_sequence_number: {e:#}");
120                None
121            }
122        }
123    }
124
125    fn get_checkpoint_contents_by_digest(
126        &self,
127        _digest: &CheckpointContentsDigest,
128    ) -> Option<CheckpointContents> {
129        // Lookup by content digest would require a separate
130        // `CheckpointContentsDigest → seq` index that this store
131        // does not currently maintain. Callers that have a content
132        // digest in hand are typically following a checkpoint
133        // header they already located by sequence number; they
134        // should use that sequence number directly.
135        None
136    }
137
138    fn get_checkpoint_contents_by_sequence_number(
139        &self,
140        sequence_number: CheckpointSequenceNumber,
141    ) -> Option<CheckpointContents> {
142        match self.schema().get_checkpoint_contents(sequence_number) {
143            Ok(contents) => contents,
144            Err(e) => {
145                error!(
146                    sequence_number,
147                    "get_checkpoint_contents_by_sequence_number: {e:#}"
148                );
149                None
150            }
151        }
152    }
153
154    fn get_transaction(&self, tx_digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
155        let tx_seq = match self.schema().get_tx_seq_by_digest(tx_digest) {
156            Ok(Some(seq)) => seq,
157            Ok(None) => return None,
158            Err(e) => {
159                error!(?tx_digest, "get_transaction seq lookup: {e:#}");
160                return None;
161            }
162        };
163        let (transaction, signatures) = match self.schema().get_transaction(tx_seq) {
164            Ok(Some(pair)) => pair,
165            Ok(None) => return None,
166            Err(e) => {
167                error!(tx_seq, "get_transaction data lookup: {e:#}");
168                return None;
169            }
170        };
171        let envelope =
172            sui_types::transaction::Transaction::from_generic_sig_data(transaction, signatures);
173        Some(Arc::new(VerifiedTransaction::new_unchecked(envelope)))
174    }
175
176    fn get_transaction_effects(&self, tx_digest: &TransactionDigest) -> Option<TransactionEffects> {
177        let tx_seq = match self.schema().get_tx_seq_by_digest(tx_digest) {
178            Ok(Some(seq)) => seq,
179            Ok(None) => return None,
180            Err(e) => {
181                error!(?tx_digest, "get_transaction_effects seq lookup: {e:#}");
182                return None;
183            }
184        };
185        match self.schema().get_effects(tx_seq) {
186            Ok(Some((effects, _unchanged))) => Some(effects),
187            Ok(None) => None,
188            Err(e) => {
189                error!(tx_seq, "get_transaction_effects: {e:#}");
190                None
191            }
192        }
193    }
194
195    fn get_events(&self, event_digest: &TransactionDigest) -> Option<TransactionEvents> {
196        // `event_digest` is named for the trait but our index
197        // resolves by transaction digest (events are keyed by
198        // tx_seq in this store).
199        let tx_seq = match self.schema().get_tx_seq_by_digest(event_digest) {
200            Ok(Some(seq)) => seq,
201            Ok(None) => return None,
202            Err(e) => {
203                error!(?event_digest, "get_events seq lookup: {e:#}");
204                return None;
205            }
206        };
207        match self.schema().get_events(tx_seq) {
208            Ok(events) => events,
209            Err(e) => {
210                error!(tx_seq, "get_events: {e:#}");
211                None
212            }
213        }
214    }
215
216    fn get_unchanged_loaded_runtime_objects(
217        &self,
218        digest: &TransactionDigest,
219    ) -> Option<Vec<ObjectKey>> {
220        let tx_seq = match self.schema().get_tx_seq_by_digest(digest) {
221            Ok(Some(seq)) => seq,
222            Ok(None) => return None,
223            Err(e) => {
224                error!(
225                    ?digest,
226                    "get_unchanged_loaded_runtime_objects seq lookup: {e:#}"
227                );
228                return None;
229            }
230        };
231        match self.schema().get_effects(tx_seq) {
232            Ok(Some((_effects, unchanged))) => Some(unchanged),
233            Ok(None) => None,
234            Err(e) => {
235                error!(tx_seq, "get_unchanged_loaded_runtime_objects: {e:#}");
236                None
237            }
238        }
239    }
240
241    fn get_transaction_checkpoint(
242        &self,
243        digest: &TransactionDigest,
244    ) -> Option<CheckpointSequenceNumber> {
245        let tx_seq = match self.schema().get_tx_seq_by_digest(digest) {
246            Ok(Some(seq)) => seq,
247            Ok(None) => return None,
248            Err(e) => {
249                error!(?digest, "get_transaction_checkpoint seq lookup: {e:#}");
250                return None;
251            }
252        };
253        match self.schema().get_tx_metadata_by_seq(tx_seq) {
254            Ok(Some(meta)) => Some(meta.checkpoint_seq),
255            Ok(None) => None,
256            Err(e) => {
257                error!(tx_seq, "get_transaction_checkpoint: {e:#}");
258                None
259            }
260        }
261    }
262
263    fn get_full_checkpoint_contents(
264        &self,
265        _sequence_number: Option<CheckpointSequenceNumber>,
266        _digest: &CheckpointContentsDigest,
267    ) -> Option<sui_types::messages_checkpoint::VersionedFullCheckpointContents> {
268        // State-sync path. `VersionedFullCheckpointContents`
269        // bundles transactions + signatures + effects for an entire
270        // checkpoint; assembling it would require iterating the
271        // checkpoint's contents and joining each row across
272        // `transactions` + `effects`. Not on the rpc-api hot path,
273        // so leave it as a follow-up.
274        None
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use std::sync::Arc;
281
282    use sui_consistent_store::Db;
283    use sui_consistent_store::DbOptions;
284    use sui_types::crypto::AggregateAuthoritySignature;
285    use sui_types::crypto::AuthorityStrongQuorumSignInfo;
286    use sui_types::digests::CheckpointDigest;
287    use sui_types::digests::TransactionDigest;
288    use sui_types::gas::GasCostSummary;
289    use sui_types::message_envelope::Message;
290    use sui_types::messages_checkpoint::CheckpointSummary;
291    use sui_types::storage::ReadStore;
292
293    use crate::RpcStoreSchema;
294    use crate::reader::RpcStoreReader;
295    use crate::schema::checkpoint_contents;
296    use crate::schema::checkpoint_seq_by_digest;
297    use crate::schema::checkpoint_summary;
298    use crate::schema::primitives::U64Be;
299    use crate::schema::primitives::U64Varint;
300    use crate::schema::pruning_watermark;
301
302    fn dummy_summary(seq: u64) -> CheckpointSummary {
303        CheckpointSummary {
304            epoch: 0,
305            sequence_number: seq,
306            network_total_transactions: 0,
307            content_digest: sui_types::digests::CheckpointContentsDigest::new([0; 32]),
308            previous_digest: None,
309            epoch_rolling_gas_cost_summary: GasCostSummary::default(),
310            timestamp_ms: 0,
311            checkpoint_commitments: vec![],
312            end_of_epoch_data: None,
313            version_specific_data: vec![],
314        }
315    }
316
317    fn dummy_signature() -> AuthorityStrongQuorumSignInfo {
318        AuthorityStrongQuorumSignInfo {
319            epoch: 0,
320            signature: AggregateAuthoritySignature::default(),
321            signers_map: roaring::RoaringBitmap::new(),
322        }
323    }
324
325    fn fresh_reader() -> (tempfile::TempDir, Db, RpcStoreReader) {
326        let dir = tempfile::tempdir().unwrap();
327        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
328        let reader = RpcStoreReader::new(db.clone(), Arc::new(schema));
329        (dir, db, reader)
330    }
331
332    fn seed_checkpoint(db: &Db, reader: &RpcStoreReader, seq: u64) -> CheckpointSummary {
333        let summary = dummy_summary(seq);
334        let digest = summary.digest();
335        let mut batch = db.batch();
336        batch
337            .put(
338                &reader.schema().checkpoint_summary,
339                &U64Be(seq),
340                &checkpoint_summary::store(&summary, &dummy_signature()),
341            )
342            .unwrap();
343        batch
344            .put(
345                &reader.schema().checkpoint_seq_by_digest,
346                &checkpoint_seq_by_digest::Key(digest),
347                &U64Varint(seq),
348            )
349            .unwrap();
350        batch.commit().unwrap();
351        summary
352    }
353
354    #[test]
355    fn latest_checkpoint_errors_when_empty() {
356        let (_dir, _db, reader) = fresh_reader();
357        let err = reader.get_latest_checkpoint().unwrap_err();
358        assert!(format!("{err:#}").contains("no checkpoints"));
359    }
360
361    #[test]
362    fn latest_checkpoint_returns_highest_seq() {
363        let (_dir, db, reader) = fresh_reader();
364        seed_checkpoint(&db, &reader, 0);
365        let s5 = seed_checkpoint(&db, &reader, 5);
366        seed_checkpoint(&db, &reader, 3);
367
368        let latest = reader.get_latest_checkpoint().unwrap();
369        assert_eq!(latest.sequence_number(), s5.sequence_number());
370    }
371
372    #[test]
373    fn lookup_by_digest_round_trips() {
374        let (_dir, db, reader) = fresh_reader();
375        let summary = seed_checkpoint(&db, &reader, 7);
376        let digest: CheckpointDigest = summary.digest();
377        let read = reader.get_checkpoint_by_digest(&digest).expect("present");
378        assert_eq!(read.sequence_number(), summary.sequence_number());
379    }
380
381    #[test]
382    fn lookup_by_digest_returns_none_for_unknown() {
383        let (_dir, _db, reader) = fresh_reader();
384        let digest = CheckpointDigest::new([9; 32]);
385        assert!(reader.get_checkpoint_by_digest(&digest).is_none());
386    }
387
388    #[test]
389    fn lowest_available_returns_zero_when_unset() {
390        let (_dir, _db, reader) = fresh_reader();
391        assert_eq!(reader.get_lowest_available_checkpoint().unwrap(), 0);
392    }
393
394    #[test]
395    fn lowest_available_reflects_pruning_watermark() {
396        let (_dir, db, reader) = fresh_reader();
397        let mut batch = db.batch();
398        batch
399            .put(
400                &reader.schema().pruning_watermark,
401                &crate::schema::primitives::UnitKey,
402                &pruning_watermark::store(&pruning_watermark::Watermarks {
403                    tx_seq_lo: 100,
404                    checkpoint_lo: 42,
405                })
406                .1,
407            )
408            .unwrap();
409        batch.commit().unwrap();
410        assert_eq!(reader.get_lowest_available_checkpoint().unwrap(), 42);
411    }
412
413    #[test]
414    fn contents_by_seq_round_trips() {
415        let (_dir, db, reader) = fresh_reader();
416        let contents =
417            sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
418                vec![sui_types::base_types::ExecutionDigests {
419                    transaction: TransactionDigest::new([1; 32]),
420                    effects: sui_types::digests::TransactionEffectsDigest::new([2; 32]),
421                }],
422            );
423        let mut batch = db.batch();
424        batch
425            .put(
426                &reader.schema().checkpoint_contents,
427                &U64Be(11),
428                &checkpoint_contents::store(&contents),
429            )
430            .unwrap();
431        batch.commit().unwrap();
432
433        let read = reader
434            .get_checkpoint_contents_by_sequence_number(11)
435            .expect("present");
436        assert_eq!(read.digest(), contents.digest());
437    }
438}